commit 744e51d63dc961062f42125b0fc585ab5056ccd5 Author: baolong Date: Thu Jun 5 19:43:29 2025 +0800 Upload files to "/" diff --git a/modbus-config-changer-v1.1.py b/modbus-config-changer-v1.1.py new file mode 100644 index 0000000..d0b2963 --- /dev/null +++ b/modbus-config-changer-v1.1.py @@ -0,0 +1,313 @@ +#!/usr/bin/env python3 +""" +Modbus Device Configuration Changer +Changes voltage configuration parameters on a Modbus device following a specific workflow. +""" + +import minimalmodbus +import time +import argparse +import sys + +# Modbus function codes +MODBUS_FC_READ_COILS = 0x01 +MODBUS_FC_READ_HOLDING_REGISTERS = 0x03 +MODBUS_FC_READ_INPUT_REGISTERS = 0x04 +MODBUS_FC_WRITE_SINGLE_COIL = 0x05 +MODBUS_FC_WRITE_SINGLE_REGISTER = 0x06 + +# Device configuration from config.json +SLAVE_ID = 0x10 # 16 in decimal + +# Register addresses +PASSWORD_LO_ADDR = 92 +PASSWORD_HI_ADDR = 93 +MAIN_STATE_ADDR = 61 +PFC_VAC_MIN_ADDR = 56 +PFC_VAC_MAX_ADDR = 58 +PFC_CURRENT_MAX_ADDR = 48 +CFG_REWRITE_ADDR = 49 # Assuming this address based on pattern + +# Password values +PASSWORD_LO_VALUE = 0xBEEF +PASSWORD_HI_VALUE = 0xFEED + +# Voltage configurations +VOLTAGE_CONFIGS = { + '110V': { + 'PFC_Vac_min': 90.000000, + 'PFC_Vac_max': 150.000000, + 'PFC_Current_max': 40.000000 + }, + '220V': { + 'PFC_Vac_min': 200.000000, + 'PFC_Vac_max': 270.000000, + 'PFC_Current_max': 30.000000 + } +} + +class ModbusConfigChanger: + """ + Class to handle Modbus device configuration changes + """ + + def __init__(self, port, baudrate=9600): + """ + Initialize Modbus connection + + Args: + port (str): COM port for Modbus communication + baudrate (int): Baudrate for communication (default: 9600) + """ + try: + self.instrument = minimalmodbus.Instrument(port, SLAVE_ID, mode='rtu') + self.instrument.serial.baudrate = baudrate + self.instrument.serial.timeout = 2 + print(f"Connected to Modbus device on {port} at {baudrate} baud") + except Exception as e: + print(f"Error connecting to Modbus device: {e}") + sys.exit(1) + + def set_passwords(self): + """ + Set password low and high values + """ + try: + print("Setting passwords...") + self.instrument.write_register(PASSWORD_LO_ADDR, PASSWORD_LO_VALUE, + functioncode=MODBUS_FC_WRITE_SINGLE_REGISTER) + print(f"Password LOW set to 0x{PASSWORD_LO_VALUE:04X}") + + self.instrument.write_register(PASSWORD_HI_ADDR, PASSWORD_HI_VALUE, + functioncode=MODBUS_FC_WRITE_SINGLE_REGISTER) + print(f"Password HIGH set to 0x{PASSWORD_HI_VALUE:04X}") + except Exception as e: + print(f"Error setting passwords: {e}") + return False + return True + + def wait_for_main_state(self, target_state, timeout=15): + """ + Wait for main state to reach target value + + Args: + target_state (int): Target state value + timeout (int): Timeout in seconds + + Returns: + bool: True if target state reached, False if timeout + """ + print(f"Waiting for main state to reach {target_state}...") + start_time = time.time() + + while time.time() - start_time < timeout: + try: + current_state = self.instrument.read_register(MAIN_STATE_ADDR, + functioncode=MODBUS_FC_READ_INPUT_REGISTERS) + print(f"Current main state: {current_state}") + + if current_state == target_state: + print(f"Main state reached target value: {target_state}") + return True + + time.sleep(1) # Wait 1 second before checking again + + except Exception as e: + print(f"Error reading main state: {e}") + time.sleep(1) + + print(f"Timeout waiting for main state {target_state}") + return False + + def read_float_register(self, address): + """ + Read a float value from holding register + + Args: + address (int): Register address + + Returns: + float: Register value or None if error + """ + try: + # Read 2 consecutive registers for float (32-bit) + value = self.instrument.read_float(address, functioncode=MODBUS_FC_READ_HOLDING_REGISTERS) + return value + except Exception as e: + print(f"Error reading float register {address}: {e}") + return None + + def write_float_register(self, address, value): + """ + Write a float value to holding register + + Args: + address (int): Register address + value (float): Value to write + + Returns: + bool: True if successful, False otherwise + """ + try: + self.instrument.write_float(address, value) + return True + except Exception as e: + print(f"Error writing float register {address}: {e}") + return False + + def check_and_update_config(self, voltage_config): + """ + Check current configuration and update if needed + + Args: + voltage_config (dict): Target configuration values + + Returns: + bool: True if all configurations are correct, False otherwise + """ + print("Checking current configuration...") + + config_map = { + 'PFC_Vac_min': PFC_VAC_MIN_ADDR, + 'PFC_Vac_max': PFC_VAC_MAX_ADDR, + 'PFC_Current_max': PFC_CURRENT_MAX_ADDR + } + + all_correct = True + + for param_name, target_value in voltage_config.items(): + address = config_map[param_name] + current_value = self.read_float_register(address) + + if current_value is None: + all_correct = False + continue + + print(f" {param_name}: Current = {current_value:.1f}, Target = {target_value:.1f}") + + if abs(current_value - target_value) > 0.1: # Allow small tolerance + print(f"> Updating {param_name} from {current_value:.1f} to {target_value:.1f}") + if self.write_float_register(address, target_value): + # Read back to verify + readback_value = self.read_float_register(address) + if readback_value is not None and abs(readback_value - target_value) <= 0.1: + print(f"✓ {param_name} successfully updated and verified") + else: + print(f"✗ {param_name} verification failed") + all_correct = False + else: + print(f"✗ Failed to update {param_name}") + all_correct = False + else: + print(f"✓ {param_name} already at target value") + + return all_correct + + def commit_configuration(self): + """ + Commit configuration to flash memory + + Returns: + bool: True if successful, False otherwise + """ + try: + print("Committing configuration to flash memory...") + self.instrument.write_bit(CFG_REWRITE_ADDR, 1, + functioncode=MODBUS_FC_WRITE_SINGLE_COIL) + return False + except Exception as e: + return True + + def close_connection(self): + """ + Close Modbus connection + """ + try: + self.instrument.close_port_after_each_call = True + except: + pass + +def main(): + """ + Main function to execute the configuration workflow + """ + # Parse command line arguments + parser = argparse.ArgumentParser(description='Modbus Device Configuration Changer') + parser.add_argument('-p', '--port', required=True, help='COM port (e.g., COM3, /dev/ttyUSB0)') + parser.add_argument('-v', '--voltage', required=True, choices=['110V', '220V'], + help='Voltage configuration (110V or 220V)') + + args = parser.parse_args() + + com_port = args.port + voltage_option = args.voltage + + print(f"Starting Modbus configuration change for {voltage_option} on {com_port}") + print("=" * 60) + + # Get target configuration + target_config = VOLTAGE_CONFIGS[voltage_option] + print(f"Target configuration for {voltage_option}:") + for param, value in target_config.items(): + print(f" {param}: {value}") + print("=" * 60) + + # Initialize Modbus connection + modbus = ModbusConfigChanger(com_port) + + try: + # Step 1: Set passwords + if not modbus.set_passwords(): + print("Failed to set passwords. Exiting.") + return False + + # Step 2: Wait for main state to be 15 + if not modbus.wait_for_main_state(15): + print("Failed to reach main state 15. Exiting.") + return False + + # Step 3: Check and update configuration + if not modbus.check_and_update_config(target_config): + print("Configuration update failed. Exiting.") + return False + + # Step 4: Wait for device restart + print("Waiting 10 seconds for device restart...") + time.sleep(10) + + # Step 5: Commit configuration to flash + if not modbus.commit_configuration(): + print("Failed to commit configuration. Exiting.") + return False + print("Finish configuration") + + # Step 6: Check final main state (should be 1 or 2) + print("Checking final main state...") + final_state = None + for attempt in range(10): # Try for 10 seconds + try: + final_state = modbus.instrument.read_register(MAIN_STATE_ADDR, + functioncode=MODBUS_FC_READ_INPUT_REGISTERS) + print(f"Final main state: {final_state}") + if final_state in [1, 2]: + print("✓ Device configuration completed successfully!") + break + except: + pass + time.sleep(1) + + if final_state not in [1, 2]: + print("Warning: Final main state is not 1 or 2. Configuration may not be complete.") + + print("=" * 60) + print("Configuration change workflow completed!") + + except KeyboardInterrupt: + print("\nOperation interrupted by user.") + except Exception as e: + print(f"Unexpected error: {e}") + finally: + modbus.close_connection() + +if __name__ == "__main__": + main() \ No newline at end of file