#!/usr/bin/env python3 """ Modbus Device Configuration Changer Changes voltage configuration parameters on a Modbus device following a specific workflow. """ import minimalmodbus import time import argparse import sys # Modbus function codes MODBUS_FC_READ_COILS = 0x01 MODBUS_FC_READ_HOLDING_REGISTERS = 0x03 MODBUS_FC_READ_INPUT_REGISTERS = 0x04 MODBUS_FC_WRITE_SINGLE_COIL = 0x05 MODBUS_FC_WRITE_SINGLE_REGISTER = 0x06 # Device configuration from config.json SLAVE_ID = 0x10 # 16 in decimal # Register addresses PASSWORD_LO_ADDR = 92 PASSWORD_HI_ADDR = 93 MAIN_STATE_ADDR = 61 PFC_VAC_MIN_ADDR = 56 PFC_VAC_MAX_ADDR = 58 PFC_CURRENT_MAX_ADDR = 48 CFG_REWRITE_ADDR = 49 # Assuming this address based on pattern # Password values PASSWORD_LO_VALUE = 0xBEEF PASSWORD_HI_VALUE = 0xFEED # Voltage configurations VOLTAGE_CONFIGS = { '110V': { 'PFC_Vac_min': 90.000000, 'PFC_Vac_max': 150.000000, 'PFC_Current_max': 40.000000 }, '220V': { 'PFC_Vac_min': 200.000000, 'PFC_Vac_max': 270.000000, 'PFC_Current_max': 30.000000 } } class ModbusConfigChanger: """ Class to handle Modbus device configuration changes """ def __init__(self, port, baudrate=9600): """ Initialize Modbus connection Args: port (str): COM port for Modbus communication baudrate (int): Baudrate for communication (default: 9600) """ try: self.instrument = minimalmodbus.Instrument(port, SLAVE_ID, mode='rtu') self.instrument.serial.baudrate = baudrate self.instrument.serial.timeout = 2 print(f"Connected to Modbus device on {port} at {baudrate} baud") except Exception as e: print(f"Error connecting to Modbus device: {e}") sys.exit(1) def set_passwords(self): """ Set password low and high values """ try: print("Setting passwords...") self.instrument.write_register(PASSWORD_LO_ADDR, PASSWORD_LO_VALUE, functioncode=MODBUS_FC_WRITE_SINGLE_REGISTER) print(f"Password LOW set to 0x{PASSWORD_LO_VALUE:04X}") self.instrument.write_register(PASSWORD_HI_ADDR, PASSWORD_HI_VALUE, functioncode=MODBUS_FC_WRITE_SINGLE_REGISTER) print(f"Password HIGH set to 0x{PASSWORD_HI_VALUE:04X}") except Exception as e: print(f"Error setting passwords: {e}") return False return True def wait_for_main_state(self, target_state, timeout=15): """ Wait for main state to reach target value Args: target_state (int): Target state value timeout (int): Timeout in seconds Returns: bool: True if target state reached, False if timeout """ print(f"Waiting for main state to reach {target_state}...") start_time = time.time() while time.time() - start_time < timeout: try: current_state = self.instrument.read_register(MAIN_STATE_ADDR, functioncode=MODBUS_FC_READ_INPUT_REGISTERS) print(f"Current main state: {current_state}") if current_state == target_state: print(f"Main state reached target value: {target_state}") return True time.sleep(1) # Wait 1 second before checking again except Exception as e: print(f"Error reading main state: {e}") time.sleep(1) print(f"Timeout waiting for main state {target_state}") return False def read_float_register(self, address): """ Read a float value from holding register Args: address (int): Register address Returns: float: Register value or None if error """ try: # Read 2 consecutive registers for float (32-bit) value = self.instrument.read_float(address, functioncode=MODBUS_FC_READ_HOLDING_REGISTERS) return value except Exception as e: print(f"Error reading float register {address}: {e}") return None def write_float_register(self, address, value): """ Write a float value to holding register Args: address (int): Register address value (float): Value to write Returns: bool: True if successful, False otherwise """ try: self.instrument.write_float(address, value) return True except Exception as e: print(f"Error writing float register {address}: {e}") return False def check_and_update_config(self, voltage_config): """ Check current configuration and update if needed Args: voltage_config (dict): Target configuration values Returns: bool: True if all configurations are correct, False otherwise """ print("Checking current configuration...") config_map = { 'PFC_Vac_min': PFC_VAC_MIN_ADDR, 'PFC_Vac_max': PFC_VAC_MAX_ADDR, 'PFC_Current_max': PFC_CURRENT_MAX_ADDR } all_correct = True for param_name, target_value in voltage_config.items(): address = config_map[param_name] current_value = self.read_float_register(address) if current_value is None: all_correct = False continue print(f" {param_name}: Current = {current_value:.1f}, Target = {target_value:.1f}") if abs(current_value - target_value) > 0.01: # Allow small tolerance print(f"> Updating {param_name} from {current_value:.1f} to {target_value:.1f}") if self.write_float_register(address, target_value): # Read back to verify readback_value = self.read_float_register(address) if readback_value is not None and abs(readback_value - target_value) <= 0.1: print(f"✓ {param_name} successfully updated and verified") else: print(f"✗ {param_name} verification failed") all_correct = False else: print(f"✗ Failed to update {param_name}") all_correct = False else: print(f"✓ {param_name} already at target value") return all_correct def commit_configuration(self): """ Commit configuration to flash memory Returns: bool: True if successful, False otherwise """ try: print("Committing configuration to flash memory...") self.instrument.write_bit(CFG_REWRITE_ADDR, 1, functioncode=MODBUS_FC_WRITE_SINGLE_COIL) return False except Exception as e: return True def close_connection(self): """ Close Modbus connection """ try: self.instrument.close_port_after_each_call = True except: pass def main(): """ Main function to execute the configuration workflow """ # Parse command line arguments parser = argparse.ArgumentParser(description='Modbus Device Configuration Changer') parser.add_argument('-p', '--port', required=True, help='COM port (e.g., COM3, /dev/ttyUSB0)') parser.add_argument('-v', '--voltage', required=True, choices=['110V', '220V'], help='Voltage configuration (110V or 220V)') args = parser.parse_args() com_port = args.port voltage_option = args.voltage print(f"Starting Modbus configuration change for {voltage_option} on {com_port}") print("=" * 60) # Get target configuration target_config = VOLTAGE_CONFIGS[voltage_option] print(f"Target configuration for {voltage_option}:") for param, value in target_config.items(): print(f" {param}: {value}") print("=" * 60) # Initialize Modbus connection modbus = ModbusConfigChanger(com_port) try: # Step 1: Set passwords if not modbus.set_passwords(): print("Failed to set passwords. Exiting.") return False # Step 2: Wait for main state to be 15 if not modbus.wait_for_main_state(15): print("Failed to reach main state 15. Exiting.") return False # Step 3: Check and update configuration if not modbus.check_and_update_config(target_config): print("Configuration update failed. Exiting.") return False # Step 4: Wait for device restart print("Waiting 10 seconds for device restart...") time.sleep(10) # Step 5: Commit configuration to flash if not modbus.commit_configuration(): print("Failed to commit configuration. Exiting.") return False print("Finish configuration") # Step 6: Check final main state (should be 1 or 2) print("Checking final main state...") final_state = None for attempt in range(10): # Try for 10 seconds try: final_state = modbus.instrument.read_register(MAIN_STATE_ADDR, functioncode=MODBUS_FC_READ_INPUT_REGISTERS) print(f"Final main state: {final_state}") if final_state in [1, 2]: print("✓ Device configuration completed successfully!") break except: pass time.sleep(1) if final_state not in [1, 2]: print("Warning: Final main state is not 1 or 2. Configuration may not be complete.") print("=" * 60) print("Configuration change workflow completed!") except KeyboardInterrupt: print("\nOperation interrupted by user.") except Exception as e: print(f"Unexpected error: {e}") finally: modbus.close_connection() if __name__ == "__main__": main()