Utility.py is Python script that implements a Modbus TCP server to communicate with a smart meter, which is an electronic device used for measuring electrical energy consumption. Here’s a breakdown of what the script does:
- The code imports the necessary modules to implement a Modbus TCP server using the pyModbusTCP library.
- It defines a class called “Registers” that stores the wattage values received from the smart meter in a nested list for future processes.
- The “main” function starts the Modbus TCP server and initializes two threads: “register0” and “sync_check”. These threads are responsible for reading the wattage values from the smart meter and checking if the smart meter is responding or not, respectively.
- The “sync_check” function runs in a while loop and checks if the smart meter has responded within the last 10 seconds. If not, it raises an alert.
- The “register0” function also runs in a while loop and reads the wattage values from the smart meter by querying the holding registers and coils using the Modbus protocol. It then converts the binary data received into a float value representing the wattage, and stores it in the “mainlist” attribute of the “Registers” class. It also updates the value of the “synchronizer” variable to keep track of the latest value received from the smart meter. If the value of “recv_syncint” (the latest synchronization value) is less than the previous value of “synchronizer”, it indicates that the smart meter was rebooted or there was a delay on the path, so the script raises an alert and continues to read values from the smart meter.
Overall, this script is used to monitor the energy consumption of a smart meter by reading its wattage values and storing them in a list for future processing. It also checks if the smart meter is responding in a timely manner and raises an alert if it isn’t.
from pyModbusTCP.server import ModbusServer import threading, struct, time server = ModbusServer(host="192.168.10.60", port=502, no_block=True) class Registers: # wattage gets stored here for future processes # each list is for each thread i.e. one smart meter mainlist = [, ] def main(): print("[+] INITIATING UTILITY [+]") server.start() try: threading.Thread(target=register0, daemon=False).start() threading.Thread(target=sync_check, daemon=False).start() except: exit() def sync_check(): while True: # Check if 10 seconds have passed without a new value received if time.time() - last_updated_time > 10: print("Alert: The SM is not responding") time.sleep(10) def register0(): # Register 0 for smart meter 1 synchronizer = 0 global last_updated_time last_updated_time = time.time() while True: register0 = server.data_bank.get_coils(0, 32) recv_sync = server.data_bank.get_holding_registers(0, 1) recv_syncint = recv_sync if recv_syncint > synchronizer: boolean_list = register0[:32] binary_string = ''.join('1' if x else '0' for x in boolean_list) watts = struct.unpack('f', int(binary_string, 2).to_bytes(4, byteorder='big')) watts = round(watts, 5) synchronizer = recv_syncint Registers.mainlist.append(watts) last_updated_time = time.time() # Reset timer on new value received print(watts, synchronizer) elif recv_syncint < synchronizer: print("SM was rebooted or there was a delay on path") boolean_list = register0[:32] binary_string = ''.join('1' if x else '0' for x in boolean_list) watts = struct.unpack('f', int(binary_string, 2).to_bytes(4, byteorder='big')) watts = round(watts, 5) Registers.mainlist.append(watts) last_updated_time = time.time() # Reset timer on new value received synchronizer = recv_syncint print(watts, synchronizer) if __name__ == '__main__': main()