Alexander Rössler
Published © MIT

Controlling TP-Link HS100/110 Smart Plugs with Machinekit

This article is about controlling the TP-Link HS100/HS110 smart home automation power sockets via Machinekit HAL.

IntermediateFull instructions provided2 hours6,769
Controlling TP-Link HS100/110 Smart Plugs with Machinekit

Things used in this project

Hardware components

Zolertia TP-Link HS110
×1

Software apps and online services

Machinekit

Story

Read more

Code

Code snippet #1

Python
#!/usr/bin/env python

import time
import argparse
import hal

def main():
    parser = argparse.ArgumentParser(description='HAL component to control TP-Link HS100/HS110 smartplugs')
    parser.add_argument('-n', '--name', help='HAL component name', required=True)
    parser.add_argument('-i', '--interval', help='Value update interval', default=0.5)

    # parse arguments
    args = parser.parse_args()
    updateInterval = float(args.interval)

    # create HAL component
    h = hal.component(args.name)
    enablePin = h.newpin('enable', hal.HAL_BIT, hal.HAL_IO)
    h.ready()

    try:
        while (True): #  main loop
            startTime = time.time()

            # processing
            enablePin.value = not enablePin.value

            # sleep
            sleepTime = updateInterval - (time.time() - startTime)  # corrects for processing time
            time.sleep(max(sleepTime, 0.0))

    except KeyboardInterrupt:
        print(("exiting HAL component " + args.name))
        h.exit()

if __name__ == "__main__":
    main()

Code snippet #2

Python
# Predefined Smart Plug Commands
# For a full list of commands, consult tplink_commands.txt
commands = {'info'     : '{"system":{"get_sysinfo":{}}}',
            'on'       : '{"system":{"set_relay_state":{"state":1}}}',
            'off'      : '{"system":{"set_relay_state":{"state":0}}}',
            'cloudinfo': '{"cnCloud":{"get_info":{}}}',
            'wlanscan' : '{"netif":{"get_scaninfo":{"refresh":0}}}',
            'time'     : '{"time":{"get_time":{}}}',
            'schedule' : '{"schedule":{"get_rules":{}}}',
            'countdown': '{"count_down":{"get_rules":{}}}',
            'antitheft': '{"anti_theft":{"get_rules":{}}}',
            'reboot'   : '{"system":{"reboot":{"delay":1}}}',
            'reset'    : '{"system":{"reset":{"delay":1}}}'
}

# Encryption and Decryption of TP-Link Smart Home Protocol
# XOR Autokey Cipher with starting key = 171
def encrypt(string):
    key = 171
    result = "\0\0\0\0"
    for i in string:
        a = key ^ ord(i)
        key = a
        result += chr(a)
    return result

def decrypt(string):
    key = 171
    result = ""
    for i in string:
        a = key ^ ord(i)
        key = ord(i)
        result += chr(a)
    return result

Code snippet #3

Python
port = 9999
sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock_tcp.connect((ip, port))
sock_tcp.send(encrypt(cmd))
data = sock_tcp.recv(2048)
sock_tcp.close()

Code snippet #4

Plain text
Get System Info (Software & Hardware Versions, MAC, deviceID, hwID etc.)
{"system":{"get_sysinfo":null}}

Turn On
{"system":{"set_relay_state":{"state":1}}}

Turn Off
{"system":{"set_relay_state":{"state":0}}}

Get Realtime Current and Voltage Reading
{"emeter":{"get_realtime":{}}}

Code snippet #5

Python
class HS1xx():
    def __init__(self, ip, emeter=True):
        self.ip = ip
        self.port = 9999  # standard port
        self.connected = False
        self.timeout = 0.25
        self.socket = None
        self.recv_buffer = 2048
        # status
        self.enable = False
        self.error = False
        self.emeter = emeter
        if self.emeter:
            self.voltage = 0.0
            self.current = 0.0
            self.power = 0.0
            self.energy = 0.0

        # prepare the update command
        commands = ['"system":{"get_sysinfo":null}']
        if self.emeter:
            commands.append('"emeter":{"get_realtime":{}}')
        self.update_command = '{%s}' % ','.join(commands)

    # source: https://github.com/softScheck/tplink-smartplug
    # Encryption and Decryption of TP-Link Smart Home Protocol
    # XOR Autokey Cipher with starting key = 171
    def encrypt(self, string):
        key = 171
        result = "\0\0\0\0"
        for i in string:
            a = key ^ ord(i)
            key = a
            result += chr(a)
        return result

    def decrypt(self, string):
        key = 171
        result = ""
        for i in string:
            a = key ^ ord(i)
            key = ord(i)
            result += chr(a)
        return result

    def connectSocket(self):
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.settimeout(self.timeout)
        try:
            self.socket.connect((self.ip, self.port))
            self.connected = True
        except socket.error:
            self.socket = None

    def closeSocket(self):
        if self.socket is not None:
            try:
                self.socket.close()
            finally:
                self.socket = None
        self.connected = False

    def socketCmd(self, cmd):
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(self.timeout)
            sock.connect((self.ip, self.port))
            sock.send(self.encrypt(cmd))
            data = sock.recv(self.recv_buffer)
            sock.close()
            data = self.decrypt(data[4:])
            return json.loads(data)
        except socket.error:
            self.handleError()
            return None

    def handleError(self):
        self.error = True

    def update(self):
        self.error = False
        self.updateStatus()

    def updateStatus(self):
        data = self.socketCmd(self.update_command)
        if data is None:
            return

        # update relay state
        sysinfo = data['system']['get_sysinfo']
        err_code = sysinfo['err_code']
        if not err_code:
            self.enable = sysinfo['relay_state']
        else:
            self.handleError()
        if not self.emeter:
            return

        # update emeter state
        realtime = data['emeter']['get_realtime']
        err_code = realtime['err_code']
        if not err_code:
            self.current = realtime['current']
            self.voltage = realtime['voltage']
            self.power = realtime['power']
            self.energy = realtime['total']
        else:
            self.handleError()

    def setRelayState(self, state):
        cmd = '{"system":{"set_relay_state":{"state":%i}}}' % int(state)
        data = self.socketCmd(cmd)
        if data is None:
            return
        result = data['system']['set_relay_state']
        err_code = result['err_code']
        if not err_code:
            self.enabled = state
        else:
            self.handleError()

Code snippet #6

Python
# create HAL component
h = hal.component(args.name)
enablePin = h.newpin('enable', hal.HAL_BIT, hal.HAL_IO)
errorPin = h.newpin('error', hal.HAL_BIT, hal.HAL_OUT)
if emeter:
    currentPin = h.newpin('current', hal.HAL_FLOAT, hal.HAL_OUT)
    voltagePin = h.newpin('voltage', hal.HAL_FLOAT, hal.HAL_OUT)
    powerPin = h.newpin('power', hal.HAL_FLOAT, hal.HAL_OUT)
    energyPin = h.newpin('energy', hal.HAL_FLOAT, hal.HAL_OUT)
h.ready()

last_hal_enable = False
last_plug_enable = False
plug = HS1xx(address, emeter)
plug.timeout = timeout

Code snippet #7

Python
# update device status
plug.update()
if not plug.error:  # update may return an error
    if emeter:  # update emeter values if enabled
        currentPin.value = plug.current
        voltagePin.value = plug.voltage
        powerPin.value = plug.power
        energyPin.value = plug.energy
    enable = enablePin.value
    # update relay state
    if last_hal_enable != enable and enable != plug.enable:
        plug.setRelayState(enable)
        if not plug.error:
            last_hal_enable = enable
    elif last_plug_enable != plug.enable:
        last_hal_enable = plug.enable
        enablePin.value = plug.enable
        last_plug_enable = plug.enable
errorPin.value = plug.error

Code snippet #8

Python
def setup_smartplugs():
    # first smartplug - machine power
    address = "10.0.0.8"
    name = "smartplug-power"
    smartplug = hal.loadusr('hal_smartplug -n %s -e -a %s' % (name, address), wait_name=name)

    smartplug.pin('enable').link('motion.digital-out-io-15')

    # second smartplug - fan control
    address = "10.0.0.7"
    name = "smartplug-fan"
    smartplug = hal.loadusr('hal_smartplug -n %s -e -a %s' % (name, address), wait_name=name)

    smartplug.pin('enable').link('motion.digital-out-io-16')

Code snippet #9

Python
...
# Smartplugs
hardware.setup_smartplugs()
# Setup Hardware
...

Code snippet #10

Plain text
#!/usr/bin/env python

import time
import argparse
import hal

def main():
    parser = argparse.ArgumentParser(description='HAL component to control TP-Link HS100/HS110 smartplugs')
    parser.add_argument('-n', '--name', help='HAL component name', required=True)
    parser.add_argument('-i', '--interval', help='Value update interval', default=0.5)

    # parse arguments
    args = parser.parse_args()
    updateInterval = float(args.interval)

    # create HAL component
    h = hal.component(args.name)
    enablePin = h.newpin('enable', hal.HAL_BIT, hal.HAL_IO)
    h.ready()

    try:
        while (True): #  main loop
            startTime = time.time()

            # processing
            enablePin.value = not enablePin.value

            # sleep
            sleepTime = updateInterval - (time.time() - startTime)  # corrects for processing time
            time.sleep(max(sleepTime, 0.0))

    except KeyboardInterrupt:
        print(("exiting HAL component " + args.name))
        h.exit()

if __name__ == "__main__":
    main()

Code snippet #11

Plain text
# Predefined Smart Plug Commands
# For a full list of commands, consult tplink_commands.txt
commands = {'info'     : '{"system":{"get_sysinfo":{}}}',
            'on'       : '{"system":{"set_relay_state":{"state":1}}}',
            'off'      : '{"system":{"set_relay_state":{"state":0}}}',
            'cloudinfo': '{"cnCloud":{"get_info":{}}}',
            'wlanscan' : '{"netif":{"get_scaninfo":{"refresh":0}}}',
            'time'     : '{"time":{"get_time":{}}}',
            'schedule' : '{"schedule":{"get_rules":{}}}',
            'countdown': '{"count_down":{"get_rules":{}}}',
            'antitheft': '{"anti_theft":{"get_rules":{}}}',
            'reboot'   : '{"system":{"reboot":{"delay":1}}}',
            'reset'    : '{"system":{"reset":{"delay":1}}}'
}

# Encryption and Decryption of TP-Link Smart Home Protocol
# XOR Autokey Cipher with starting key = 171
def encrypt(string):
    key = 171
    result = "\0\0\0\0"
    for i in string:
        a = key ^ ord(i)
        key = a
        result += chr(a)
    return result

def decrypt(string):
    key = 171
    result = ""
    for i in string:
        a = key ^ ord(i)
        key = ord(i)
        result += chr(a)
    return result

Code snippet #12

Plain text
port = 9999
sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock_tcp.connect((ip, port))
sock_tcp.send(encrypt(cmd))
data = sock_tcp.recv(2048)
sock_tcp.close()

Code snippet #13

Plain text
Get System Info (Software & Hardware Versions, MAC, deviceID, hwID etc.)
{"system":{"get_sysinfo":null}}

Turn On
{"system":{"set_relay_state":{"state":1}}}

Turn Off
{"system":{"set_relay_state":{"state":0}}}

Get Realtime Current and Voltage Reading
{"emeter":{"get_realtime":{}}}

Code snippet #14

Plain text
class HS1xx():
    def __init__(self, ip, emeter=True):
        self.ip = ip
        self.port = 9999  # standard port
        self.connected = False
        self.timeout = 0.25
        self.socket = None
        self.recv_buffer = 2048
        # status
        self.enable = False
        self.error = False
        self.emeter = emeter
        if self.emeter:
            self.voltage = 0.0
            self.current = 0.0
            self.power = 0.0
            self.energy = 0.0

        # prepare the update command
        commands = ['"system":{"get_sysinfo":null}']
        if self.emeter:
            commands.append('"emeter":{"get_realtime":{}}')
        self.update_command = '{%s}' % ','.join(commands)

    # source: https://github.com/softScheck/tplink-smartplug
    # Encryption and Decryption of TP-Link Smart Home Protocol
    # XOR Autokey Cipher with starting key = 171
    def encrypt(self, string):
        key = 171
        result = "\0\0\0\0"
        for i in string:
            a = key ^ ord(i)
            key = a
            result += chr(a)
        return result

    def decrypt(self, string):
        key = 171
        result = ""
        for i in string:
            a = key ^ ord(i)
            key = ord(i)
            result += chr(a)
        return result

    def connectSocket(self):
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.settimeout(self.timeout)
        try:
            self.socket.connect((self.ip, self.port))
            self.connected = True
        except socket.error:
            self.socket = None

    def closeSocket(self):
        if self.socket is not None:
            try:
                self.socket.close()
            finally:
                self.socket = None
        self.connected = False

    def socketCmd(self, cmd):
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(self.timeout)
            sock.connect((self.ip, self.port))
            sock.send(self.encrypt(cmd))
            data = sock.recv(self.recv_buffer)
            sock.close()
            data = self.decrypt(data[4:])
            return json.loads(data)
        except socket.error:
            self.handleError()
            return None

    def handleError(self):
        self.error = True

    def update(self):
        self.error = False
        self.updateStatus()

    def updateStatus(self):
        data = self.socketCmd(self.update_command)
        if data is None:
            return

        # update relay state
        sysinfo = data['system']['get_sysinfo']
        err_code = sysinfo['err_code']
        if not err_code:
            self.enable = sysinfo['relay_state']
        else:
            self.handleError()
        if not self.emeter:
            return

        # update emeter state
        realtime = data['emeter']['get_realtime']
        err_code = realtime['err_code']
        if not err_code:
            self.current = realtime['current']
            self.voltage = realtime['voltage']
            self.power = realtime['power']
            self.energy = realtime['total']
        else:
            self.handleError()

    def setRelayState(self, state):
        cmd = '{"system":{"set_relay_state":{"state":%i}}}' % int(state)
        data = self.socketCmd(cmd)
        if data is None:
            return
        result = data['system']['set_relay_state']
        err_code = result['err_code']
        if not err_code:
            self.enabled = state
        else:
            self.handleError()

Code snippet #15

Plain text
# create HAL component
h = hal.component(args.name)
enablePin = h.newpin('enable', hal.HAL_BIT, hal.HAL_IO)
errorPin = h.newpin('error', hal.HAL_BIT, hal.HAL_OUT)
if emeter:
    currentPin = h.newpin('current', hal.HAL_FLOAT, hal.HAL_OUT)
    voltagePin = h.newpin('voltage', hal.HAL_FLOAT, hal.HAL_OUT)
    powerPin = h.newpin('power', hal.HAL_FLOAT, hal.HAL_OUT)
    energyPin = h.newpin('energy', hal.HAL_FLOAT, hal.HAL_OUT)
h.ready()

last_hal_enable = False
last_plug_enable = False
plug = HS1xx(address, emeter)
plug.timeout = timeout

Code snippet #16

Plain text
# update device status
plug.update()
if not plug.error:  # update may return an error
    if emeter:  # update emeter values if enabled
        currentPin.value = plug.current
        voltagePin.value = plug.voltage
        powerPin.value = plug.power
        energyPin.value = plug.energy
    enable = enablePin.value
    # update relay state
    if last_hal_enable != enable and enable != plug.enable:
        plug.setRelayState(enable)
        if not plug.error:
            last_hal_enable = enable
    elif last_plug_enable != plug.enable:
        last_hal_enable = plug.enable
        enablePin.value = plug.enable
        last_plug_enable = plug.enable
errorPin.value = plug.error

Code snippet #17

Plain text
def setup_smartplugs():
    # first smartplug - machine power
    address = "10.0.0.8"
    name = "smartplug-power"
    smartplug = hal.loadusr('hal_smartplug -n %s -e -a %s' % (name, address), wait_name=name)

    smartplug.pin('enable').link('motion.digital-out-io-15')

    # second smartplug - fan control
    address = "10.0.0.7"
    name = "smartplug-fan"
    smartplug = hal.loadusr('hal_smartplug -n %s -e -a %s' % (name, address), wait_name=name)

    smartplug.pin('enable').link('motion.digital-out-io-16')

Code snippet #18

Plain text
...
# Smartplugs
hardware.setup_smartplugs()
# Setup Hardware
...

Github

https://github.com/machinekoder/hal_smartplug

Github

https://github.com/softScheck/tplink-smartplug

Github

https://github.com/machinekoder/UNI-PRINT-3D

Github

http://github.com/machinekoder/QtQuickVcp/

Github

http://github.com/machinekoder/Machineface/

Credits

Alexander Rössler

Alexander Rössler

2 projects • 0 followers
Open source enthusiast and co-founder of the machinekit.io machine controls software project.

Comments