Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion API_changes.rst
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
API changes
===========
Versions (X.Y.Z) where Z > 0 e.g. 3.0.1 do NOT have API changes!
Versions (X.Y.Z) where Z == 0 e.g. 3.0.1 do NOT have API changes!

API changes 3.13.0
------------------
- removed RemoteDeviceContext, because it only is a partial forwarder
a proper forwarder should be made at frame level.
- datastore get/setValues is removed,
please use server.async_get/setValues instead.
- datastore show a deprecation warning
- SimData/SimDevice have been updated

API changes 3.12.0
------------------
Expand Down
89 changes: 89 additions & 0 deletions examples/contrib/test_datastores.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#!/usr/bin/env python3
"""Test datastores compatibility.

Control that:

- ModbusSequentialDataBlock
- ModbusSparseDataBlock

Works as the used to work.

Control that SimData/SimDevice:

- works as intended in both shared and non shared mode.

Focussing on coils and discrete inputs.
"""
import asyncio

import pymodbus.client as modbusClient
from pymodbus import FramerType
from pymodbus.datastore import (
ModbusDeviceContext,
ModbusSequentialDataBlock,
ModbusServerContext,
)
from pymodbus.logging import Log, pymodbus_apply_logging_config
from pymodbus.server import ServerAsyncStop, StartAsyncTcpServer
from pymodbus.transport import NULLMODEM_HOST


async def run_async(port, context, run_test):
"""Run server setup."""
Log.info("### start ASYNC server")
task = asyncio.create_task(StartAsyncTcpServer(
context=context,
address=(NULLMODEM_HOST, port),
framer=FramerType.SOCKET
))
await asyncio.sleep(1)

Log.info("### Create client object")
client = modbusClient.AsyncModbusTcpClient(
NULLMODEM_HOST,
port=5020,
framer=FramerType.SOCKET,
)
await client.connect()
await run_test(client)
client.close()
await ServerAsyncStop()
task.cancel()
await task


async def run_sequential(port) -> None:
"""Combine setup and run."""
async def run_old_test(client):
"""Run test."""
Log.info("### read_coils")
single = [True] + [False] * 7
register = [True] * 7 + [False] * 8 + [True]
rr = await client.read_coils(0, count=1, device_id=1)
assert rr.bits == single
rr = await client.read_coils(0, count=16, device_id=1)
assert rr.bits == register
rr = await client.read_discrete_inputs(0, count=1, device_id=1)
assert rr.bits == single
rr = await client.read_discrete_inputs(0, count=16, device_id=1)
assert rr.bits == register

co_data = [17] * 7 + [0] * 8 + [1]
di_data = [True] * 7 + [False] * 8 + [True]
context = ModbusServerContext(devices=ModbusDeviceContext(
co=ModbusSequentialDataBlock(1, co_data),
di=ModbusSequentialDataBlock(1, di_data),
),
single=True
)
Log.info("Run sequential test.")
await run_async(port, context, run_old_test)


async def run_all(port):
"""Run all tests."""
pymodbus_apply_logging_config("info")
await run_sequential(port)

if __name__ == "__main__":
asyncio.run(run_all(5020), debug=True)
86 changes: 45 additions & 41 deletions pymodbus/simulator/simdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from dataclasses import dataclass
from typing import TypeAlias, cast

from ..pdu.utils import pack_bitstring, unpack_bitstring
from .simutils import DataType, SimUtils


Expand Down Expand Up @@ -73,18 +72,25 @@ class SimData:
SimData(
address=100,
values=0xffff,
datatype=DataType.BITS
datatype=DataType.REGISTERS
)
SimData(
address=100,
values=[0xffff],
datatype=DataType.BITS
datatype=DataType.REGISTERS
)

Each SimData defines 16 BITS (coils), with value True.

Value are stored in registers (16bit is 1 register), the address refers to the register, unless
in non-shared mode where the address refers to the coil.
Value are stored in registers (16bit is 1 register).

In shared mode (coil and discrete inputs requests):
- address refers to the register, containing individual bits,
Individual bits within the register cannot be addressed,
unless "use_bit_as_address" is set on the device.

In non-shared mode (coil and discrete inputs requests)
- address refers to the bit.
"""

#: Address of first register, starting with 0 (identical to the requests)
Expand All @@ -111,6 +117,11 @@ class SimData:
#: Used to check access and convert value to/from registers or mark as invalid.
datatype: DataType = DataType.INVALID

#: String encoding
#:
#: Used to convert a SimData(DataType.STRING) to registers.
string_encoding: str = "utf-8"

#: Mark register(s) as readonly.
readonly: bool = False

Expand All @@ -129,15 +140,21 @@ def __check_simple(self):
raise TypeError("values= cannot be used with invalid=True")
if isinstance(self.values, list) and not self.values:
raise TypeError("values= list cannot be empty")
try:
"test string".encode(self.string_encoding)
except (UnicodeEncodeError, LookupError) as exc:
raise TypeError("string_encoding= not valid") from exc

def __check_parameters(self):
"""Check all parameters."""
self.__check_simple()
x_values = self.values if isinstance(self.values, list) else [self.values]
x_datatype, _x_struct, _x_len = SimUtils.DATATYPE_STRUCT[self.datatype]
if self.datatype == DataType.BITS:
x_datatype = int if isinstance(x_values[0], int) else bool
x_datatype = bool if isinstance(x_values[0], bool) else int
for x_value in x_values:
if self.datatype == DataType.BITS and x_datatype is int and isinstance(x_value, bool):
raise TypeError(f"values= {x_value} int and bool cannot be mixed")
if not isinstance(x_value, x_datatype):
raise TypeError(f"values= {x_value} is not {x_datatype!s}")
if x_datatype is str and not x_value:
Expand All @@ -147,60 +164,47 @@ def __post_init__(self):
"""Define a group of registers."""
self.__check_parameters()

def build_registers_bits_block(self) -> list[int]:
def build_registers_bits_block(self) -> list[bool]:
"""Convert values= to registers from bits (1 bit in each register)."""
x_values = self.values if isinstance(self.values, list) else [self.values]
regs: list[int] = []
if isinstance(x_values[0], bool):
for v in x_values:
regs.append(1 if v else 0)
else:
for v in cast(list[int], x_values):
bool_list = unpack_bitstring(v.to_bytes(2, byteorder="big"))
for i in bool_list:
regs.append(1 if i else 0)
return regs

def build_registers_bits_shared(self, endian: tuple[bool, bool]) -> list[int]:
return cast(list[bool], x_values)
return SimUtils.registersToBits(cast(list[int], x_values))

def build_registers_bits_shared(self) -> list[int]:
"""Convert values= to registers from bits (16 bits in each register)."""
x_values = self.values if isinstance(self.values, list) else [self.values]
if isinstance(x_values[0], bool):
if len(x_values) % 16:
raise TypeError(f"SimData address={self.address} values= must be a multiple of 16")
bytes_bits = bytearray(pack_bitstring(cast(list[bool], x_values)))
else:
bytes_bits = bytearray()
for v in x_values:
bytes_bits.extend(struct.pack(">H", v))
return SimUtils.convert_bytes_registers(bytes_bits, endian[0], endian[1], 1)

def build_registers_string(self, endian: tuple[bool, bool], string_encoding: str) -> list[int]:
if not isinstance(x_values[0], bool):
return cast(list[int], x_values)
if len(x_values) % 16:
raise TypeError(f"SimData address={self.address} values= must be a multiple of 16")
return SimUtils.bitsToRegisters(cast(list[bool], x_values))

def build_registers_string(self) -> list[int]:
"""Convert values= to registers from string(s)."""
x_values = self.values if isinstance(self.values, list) else [self.values]
blocks_regs: list[int] = []
for value in x_values:
bytes_string = cast(str, value).encode(string_encoding)
if len(bytes_string) % 2:
bytes_string += b"\x00"
blocks_regs.extend(SimUtils.convert_bytes_registers(bytearray(bytes_string), endian[0], endian[1], 1))
bytes_string = cast(str, value).encode(self.string_encoding)
regs = SimUtils.bytesToRegisters(bytes_string)
blocks_regs.extend(regs)
return blocks_regs


def build_registers(self, endian: tuple[bool, bool], string_encoding: str, block_bits: bool) -> list[int]:
def build_registers(self, block_bits: bool) -> list[int] | list[bool]:
"""Convert values= to registers."""
self.__check_parameters()
if self.datatype == DataType.STRING:
block_regs = self.build_registers_string(endian, string_encoding)
return block_regs * self.count
return self.build_registers_string() * self.count
if block_bits:
return self.build_registers_bits_block() * self.count
if self.datatype == DataType.BITS:
if block_bits:
return self.build_registers_bits_block()
return self.build_registers_bits_shared(endian)
return self.build_registers_bits_shared() * self.count

x_values = self.values if isinstance(self.values, list) else [self.values]
_x_datatype, x_struct, x_len = SimUtils.DATATYPE_STRUCT[self.datatype]
_x_datatype, x_struct, _x_len = SimUtils.DATATYPE_STRUCT[self.datatype]
blocks_regs: list[int] = []
for v in x_values:
byte_list = struct.pack(f">{x_struct}", v)
blocks_regs.extend(SimUtils.convert_bytes_registers(bytearray(byte_list), endian[0], endian[1], x_len))
blocks_regs.extend(SimUtils.bytesToRegisters(byte_list))
return blocks_regs
Loading
Loading