313 lines
11 KiB
Python
313 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Modbus Device Configuration Changer
|
|
Changes voltage configuration parameters on a Modbus device following a specific workflow.
|
|
"""
|
|
|
|
import minimalmodbus
|
|
import time
|
|
import argparse
|
|
import sys
|
|
|
|
# Modbus function codes
|
|
MODBUS_FC_READ_COILS = 0x01
|
|
MODBUS_FC_READ_HOLDING_REGISTERS = 0x03
|
|
MODBUS_FC_READ_INPUT_REGISTERS = 0x04
|
|
MODBUS_FC_WRITE_SINGLE_COIL = 0x05
|
|
MODBUS_FC_WRITE_SINGLE_REGISTER = 0x06
|
|
|
|
# Device configuration from config.json
|
|
SLAVE_ID = 0x10 # 16 in decimal
|
|
|
|
# Register addresses
|
|
PASSWORD_LO_ADDR = 92
|
|
PASSWORD_HI_ADDR = 93
|
|
MAIN_STATE_ADDR = 61
|
|
PFC_VAC_MIN_ADDR = 56
|
|
PFC_VAC_MAX_ADDR = 58
|
|
PFC_CURRENT_MAX_ADDR = 48
|
|
CFG_REWRITE_ADDR = 49 # Assuming this address based on pattern
|
|
|
|
# Password values
|
|
PASSWORD_LO_VALUE = 0xBEEF
|
|
PASSWORD_HI_VALUE = 0xFEED
|
|
|
|
# Voltage configurations
|
|
VOLTAGE_CONFIGS = {
|
|
'110V': {
|
|
'PFC_Vac_min': 90.000000,
|
|
'PFC_Vac_max': 150.000000,
|
|
'PFC_Current_max': 40.000000
|
|
},
|
|
'220V': {
|
|
'PFC_Vac_min': 200.000000,
|
|
'PFC_Vac_max': 270.000000,
|
|
'PFC_Current_max': 30.000000
|
|
}
|
|
}
|
|
|
|
class ModbusConfigChanger:
|
|
"""
|
|
Class to handle Modbus device configuration changes
|
|
"""
|
|
|
|
def __init__(self, port, baudrate=9600):
|
|
"""
|
|
Initialize Modbus connection
|
|
|
|
Args:
|
|
port (str): COM port for Modbus communication
|
|
baudrate (int): Baudrate for communication (default: 9600)
|
|
"""
|
|
try:
|
|
self.instrument = minimalmodbus.Instrument(port, SLAVE_ID, mode='rtu')
|
|
self.instrument.serial.baudrate = baudrate
|
|
self.instrument.serial.timeout = 2
|
|
print(f"Connected to Modbus device on {port} at {baudrate} baud")
|
|
except Exception as e:
|
|
print(f"Error connecting to Modbus device: {e}")
|
|
sys.exit(1)
|
|
|
|
def set_passwords(self):
|
|
"""
|
|
Set password low and high values
|
|
"""
|
|
try:
|
|
print("Setting passwords...")
|
|
self.instrument.write_register(PASSWORD_LO_ADDR, PASSWORD_LO_VALUE,
|
|
functioncode=MODBUS_FC_WRITE_SINGLE_REGISTER)
|
|
print(f"Password LOW set to 0x{PASSWORD_LO_VALUE:04X}")
|
|
|
|
self.instrument.write_register(PASSWORD_HI_ADDR, PASSWORD_HI_VALUE,
|
|
functioncode=MODBUS_FC_WRITE_SINGLE_REGISTER)
|
|
print(f"Password HIGH set to 0x{PASSWORD_HI_VALUE:04X}")
|
|
except Exception as e:
|
|
print(f"Error setting passwords: {e}")
|
|
return False
|
|
return True
|
|
|
|
def wait_for_main_state(self, target_state, timeout=15):
|
|
"""
|
|
Wait for main state to reach target value
|
|
|
|
Args:
|
|
target_state (int): Target state value
|
|
timeout (int): Timeout in seconds
|
|
|
|
Returns:
|
|
bool: True if target state reached, False if timeout
|
|
"""
|
|
print(f"Waiting for main state to reach {target_state}...")
|
|
start_time = time.time()
|
|
|
|
while time.time() - start_time < timeout:
|
|
try:
|
|
current_state = self.instrument.read_register(MAIN_STATE_ADDR,
|
|
functioncode=MODBUS_FC_READ_INPUT_REGISTERS)
|
|
print(f"Current main state: {current_state}")
|
|
|
|
if current_state == target_state:
|
|
print(f"Main state reached target value: {target_state}")
|
|
return True
|
|
|
|
time.sleep(1) # Wait 1 second before checking again
|
|
|
|
except Exception as e:
|
|
print(f"Error reading main state: {e}")
|
|
time.sleep(1)
|
|
|
|
print(f"Timeout waiting for main state {target_state}")
|
|
return False
|
|
|
|
def read_float_register(self, address):
|
|
"""
|
|
Read a float value from holding register
|
|
|
|
Args:
|
|
address (int): Register address
|
|
|
|
Returns:
|
|
float: Register value or None if error
|
|
"""
|
|
try:
|
|
# Read 2 consecutive registers for float (32-bit)
|
|
value = self.instrument.read_float(address, functioncode=MODBUS_FC_READ_HOLDING_REGISTERS)
|
|
return value
|
|
except Exception as e:
|
|
print(f"Error reading float register {address}: {e}")
|
|
return None
|
|
|
|
def write_float_register(self, address, value):
|
|
"""
|
|
Write a float value to holding register
|
|
|
|
Args:
|
|
address (int): Register address
|
|
value (float): Value to write
|
|
|
|
Returns:
|
|
bool: True if successful, False otherwise
|
|
"""
|
|
try:
|
|
self.instrument.write_float(address, value)
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error writing float register {address}: {e}")
|
|
return False
|
|
|
|
def check_and_update_config(self, voltage_config):
|
|
"""
|
|
Check current configuration and update if needed
|
|
|
|
Args:
|
|
voltage_config (dict): Target configuration values
|
|
|
|
Returns:
|
|
bool: True if all configurations are correct, False otherwise
|
|
"""
|
|
print("Checking current configuration...")
|
|
|
|
config_map = {
|
|
'PFC_Vac_min': PFC_VAC_MIN_ADDR,
|
|
'PFC_Vac_max': PFC_VAC_MAX_ADDR,
|
|
'PFC_Current_max': PFC_CURRENT_MAX_ADDR
|
|
}
|
|
|
|
all_correct = True
|
|
|
|
for param_name, target_value in voltage_config.items():
|
|
address = config_map[param_name]
|
|
current_value = self.read_float_register(address)
|
|
|
|
if current_value is None:
|
|
all_correct = False
|
|
continue
|
|
|
|
print(f" {param_name}: Current = {current_value:.1f}, Target = {target_value:.1f}")
|
|
|
|
if abs(current_value - target_value) > 0.1: # Allow small tolerance
|
|
print(f"> Updating {param_name} from {current_value:.1f} to {target_value:.1f}")
|
|
if self.write_float_register(address, target_value):
|
|
# Read back to verify
|
|
readback_value = self.read_float_register(address)
|
|
if readback_value is not None and abs(readback_value - target_value) <= 0.1:
|
|
print(f"✓ {param_name} successfully updated and verified")
|
|
else:
|
|
print(f"✗ {param_name} verification failed")
|
|
all_correct = False
|
|
else:
|
|
print(f"✗ Failed to update {param_name}")
|
|
all_correct = False
|
|
else:
|
|
print(f"✓ {param_name} already at target value")
|
|
|
|
return all_correct
|
|
|
|
def commit_configuration(self):
|
|
"""
|
|
Commit configuration to flash memory
|
|
|
|
Returns:
|
|
bool: True if successful, False otherwise
|
|
"""
|
|
try:
|
|
print("Committing configuration to flash memory...")
|
|
self.instrument.write_bit(CFG_REWRITE_ADDR, 1,
|
|
functioncode=MODBUS_FC_WRITE_SINGLE_COIL)
|
|
return False
|
|
except Exception as e:
|
|
return True
|
|
|
|
def close_connection(self):
|
|
"""
|
|
Close Modbus connection
|
|
"""
|
|
try:
|
|
self.instrument.close_port_after_each_call = True
|
|
except:
|
|
pass
|
|
|
|
def main():
|
|
"""
|
|
Main function to execute the configuration workflow
|
|
"""
|
|
# Parse command line arguments
|
|
parser = argparse.ArgumentParser(description='Modbus Device Configuration Changer')
|
|
parser.add_argument('-p', '--port', required=True, help='COM port (e.g., COM3, /dev/ttyUSB0)')
|
|
parser.add_argument('-v', '--voltage', required=True, choices=['110V', '220V'],
|
|
help='Voltage configuration (110V or 220V)')
|
|
|
|
args = parser.parse_args()
|
|
|
|
com_port = args.port
|
|
voltage_option = args.voltage
|
|
|
|
print(f"Starting Modbus configuration change for {voltage_option} on {com_port}")
|
|
print("=" * 60)
|
|
|
|
# Get target configuration
|
|
target_config = VOLTAGE_CONFIGS[voltage_option]
|
|
print(f"Target configuration for {voltage_option}:")
|
|
for param, value in target_config.items():
|
|
print(f" {param}: {value}")
|
|
print("=" * 60)
|
|
|
|
# Initialize Modbus connection
|
|
modbus = ModbusConfigChanger(com_port)
|
|
|
|
try:
|
|
# Step 1: Set passwords
|
|
if not modbus.set_passwords():
|
|
print("Failed to set passwords. Exiting.")
|
|
return False
|
|
|
|
# Step 2: Wait for main state to be 15
|
|
if not modbus.wait_for_main_state(15):
|
|
print("Failed to reach main state 15. Exiting.")
|
|
return False
|
|
|
|
# Step 3: Check and update configuration
|
|
if not modbus.check_and_update_config(target_config):
|
|
print("Configuration update failed. Exiting.")
|
|
return False
|
|
|
|
# Step 4: Wait for device restart
|
|
print("Waiting 10 seconds for device restart...")
|
|
time.sleep(10)
|
|
|
|
# Step 5: Commit configuration to flash
|
|
if not modbus.commit_configuration():
|
|
print("Failed to commit configuration. Exiting.")
|
|
return False
|
|
print("Finish configuration")
|
|
|
|
# Step 6: Check final main state (should be 1 or 2)
|
|
print("Checking final main state...")
|
|
final_state = None
|
|
for attempt in range(10): # Try for 10 seconds
|
|
try:
|
|
final_state = modbus.instrument.read_register(MAIN_STATE_ADDR,
|
|
functioncode=MODBUS_FC_READ_INPUT_REGISTERS)
|
|
print(f"Final main state: {final_state}")
|
|
if final_state in [1, 2]:
|
|
print("✓ Device configuration completed successfully!")
|
|
break
|
|
except:
|
|
pass
|
|
time.sleep(1)
|
|
|
|
if final_state not in [1, 2]:
|
|
print("Warning: Final main state is not 1 or 2. Configuration may not be complete.")
|
|
|
|
print("=" * 60)
|
|
print("Configuration change workflow completed!")
|
|
|
|
except KeyboardInterrupt:
|
|
print("\nOperation interrupted by user.")
|
|
except Exception as e:
|
|
print(f"Unexpected error: {e}")
|
|
finally:
|
|
modbus.close_connection()
|
|
|
|
if __name__ == "__main__":
|
|
main() |