This is code my simulation project for Smart Grid Security thesis. This post is about Smart Meter (SM) device that is designed to be mounted at consumer’s household. It has Modbus TCP protocol implemented using PyModbus TCP library in Python 3.10.
In the other two posts in this category we are going to talk about two firewall scripts and Utility or so called Control Centre (CC) as well.

Code is written for technicians that are supposed mount the SM at consumer’s household that means there is a simple command line menu with input validation. Since we want to be able to collect watt hours on the background, the first option initializes a thread and passes object as an argument we made when program started, the loop in data class begins simulation of electricity consumption and then program brings us back to this menu.
Second options consists of IPsec transport mode which in detail we are going to talk about in another post. But imporant is to mention that the program won’t allow option 3, unless option 2 is successfully initialized.

As you can see, when IPsec transport mode is established, SM can then send watt hours to the control centre. As you can see, the first value 7.7066 watt hours is larger because I initialized the sending data few seconds after I started data collection and thread was collecting it on the backgrouds. This is to support grid self-healing when there an outage or downtime on the path to control centre, to ensure syncfhonization between them.

Source code
This is the actual code, feel free to use it and play with it. Important aspect is the Modbus implemented. Once the simulated watt value which is floating point data type reaches the loop in sendData
function it has to go through conversion. When program initializes the Modbus TCP session, and while the session is actually opened, program rounds the float number to 5 decimal points because we don’t need it to be that big and then using struct module we convert it to binary bytes.
45 line uses a list comprehension to iterate over each byte in float_bytes
, convert it to a binary string representation using the format()
function with the :08b
format specifier (which means “format as an 8-character zero-padded binary string”), and join the resulting strings together into a single string of 0’s and 1’s using the join()
method.
46th line of code boolean_list = [bit == '1' for bit in binary_string]
creates a list boolean_list
of boolean values, where each element corresponds to a bit in the binary string binary_string
. If a bit in the binary string is equal to '1'
, then the corresponding element in the boolean_list
is True
, otherwise, it is False
.
boolean_list
is then assigned as second argument into write_multiple_coils
methos which accepts only boolean datatypes in as a list, same like write_single_register
but this one accepts integer only. Each of these methods methods create their own registry address, this SM has number 0. SM mounted somewhere else has for example address 1 and so on. Synchronizer is an integer number used for mapping values to prevent duplicates when CC and SM are initiated in different times and support synchornization in case of downtime.
from pyModbusTCP.client import ModbusClient
import random, threading, time, subprocess, struct
######################################
##### Smart Meter Modbus client
######################################
class Data:
def __init__(self):
self.kettle = 0
self.tv = 0
self.fridge = 0
def background_collect(self):
try:
while True:
# watt hours simulation
self.kettle = abs(random.uniform(0.0300, 0.0700) + self.kettle)
self.tv = abs(random.uniform(0.0100, 0.0300) + self.tv)
self.fridge = abs(random.uniform(0.0100, 0.0200) + self.fridge)
time.sleep(1) # to save computation power of CPU
except Exception:
print("Energy collection stopped")
exit()
def sendData(collector):
client = ModbusClient(host="10.0.0.1", auto_open=True, auto_close=False, port=502)
while True:
try:
client.open()
if client.open() == True:
print("[+] MOBUS SESSION OPENED [+]")
synchronizer = 1
while client.open():
watts = collector.kettle + collector.tv + collector.fridge
watts = round(watts, 5)
print(watts)
float_bytes = struct.pack('f', watts)
binary_string = ''.join(f'{byte:08b}' for byte in float_bytes)
boolean_list = [bit == '1' for bit in binary_string]
client.write_multiple_coils(0, boolean_list)
client.write_single_register(0, synchronizer)
synchronizer += 1
collector.kettle = 0
collector.tv = 0
collector.fridge = 0
time.sleep(5)
except Exception as e:
print("[+] MODBUS SESSION FAILED [+]")
print(e)
def collect(collector):
try:
threading.Thread(target=collector.background_collect, daemon=True).start()
print("[+]DATA COLLECTOR STARTED[+]")
except:
print("[+]DATA COLLECTOR FAILED[+]")
def ipsec():
try:
subprocess.run(["swanctl", "--load-all"])
subprocess.run(["swanctl", "--initiate", "--child", "host-host"])
if subprocess.run(["swanctl", "--initiate", "--child", "host-host"]).returncode == 0:
print("[+] IPsec ESTABLISHED [+]")
menu(sec_session=True)
else:
raise Exception
except KeyboardInterrupt:
print("Establishment interrupted")
except Exception:
print("Establishment failed")
######################################
##### Main menu initiators
######################################
def menu(sec_session=bool):
while True:
print()
print("[+]-- SMART METER MENU --[+]")
print("1. Start collecting data")
print("2. Secure connection")
print("3. Start sending data")
try:
option = input("your choice: ")
option = int(option)
if option == 1:
threading.Thread(target=collect, args=(collector,)).start()
elif option == 2:
ipsec()
elif option == 3 and sec_session == True:
sendData(collector=collector)
else:
print("Ipsec is not established")
except ValueError:
print("Not an integer")
except KeyboardInterrupt:
exit()
except Exception as exc:
print(exc)
if __name__ == '__main__':
collector = Data()
time.sleep(1)
menu()