|
1 | | -from paradox.connections.ip.parsers import IPMessageRequest, IPMessageResponse |
| 1 | +from paradox.connections.ip.parsers import ( |
| 2 | + IPMessageRequest, |
| 3 | + IPMessageResponse, |
| 4 | + IPPayloadConnectResponse, |
| 5 | +) |
2 | 6 |
|
3 | 7 |
|
4 | 8 | def test_IPMessageRequest_defaults(): |
@@ -64,3 +68,162 @@ def test_IPMessageResponse_defaults(): |
64 | 68 | assert data.header.wt == 0 |
65 | 69 | assert data.header.sb == 3 |
66 | 70 | assert data.payload == test_payload |
| 71 | + |
| 72 | + |
| 73 | +# --------------------------------------------------------------------------- |
| 74 | +# IPPayloadConnectResponse — PARSE tests |
| 75 | +# --------------------------------------------------------------------------- |
| 76 | +# Layout (25 bytes total): |
| 77 | +# Byte 0: login_status (Enum) |
| 78 | +# Bytes 1-16: key (16 bytes) |
| 79 | +# Bytes 17-18: hardware_version (Int16ub) |
| 80 | +# Byte 19: ip_firmware_major (HexInt = ExprAdapter on Int8ub) |
| 81 | +# Byte 20: ip_firmware_minor (HexInt) |
| 82 | +# Bytes 21-24: ip_module_serial (4 bytes) |
| 83 | +# ip_type: Pointer(21, ...) — reads byte at position 21 (= serial[0]) non-destructively |
| 84 | +# |
| 85 | +# HexInt decode: int(hex(raw_byte)[2:], 10) |
| 86 | +# e.g. raw=0x05 → hex="0x5" → "5" → int("5",10) = 5 |
| 87 | +# e.g. raw=0x50 → hex="0x50" → "50" → int("50",10) = 50 |
| 88 | + |
| 89 | + |
| 90 | +def _build_ip_payload_connect_response( |
| 91 | + login_status=0x00, # success |
| 92 | + key=b"\x00" * 16, |
| 93 | + hardware_version=0x0000, |
| 94 | + ip_firmware_major=0x05, |
| 95 | + ip_firmware_minor=0x02, |
| 96 | + ip_module_serial=b"\x71\x00\x00\x00", # IP150 serial (first byte 0x71) |
| 97 | +): |
| 98 | + """Construct raw IPPayloadConnectResponse bytes.""" |
| 99 | + assert len(key) == 16 |
| 100 | + assert len(ip_module_serial) == 4 |
| 101 | + hw_high = (hardware_version >> 8) & 0xFF |
| 102 | + hw_low = hardware_version & 0xFF |
| 103 | + return bytes( |
| 104 | + [login_status] |
| 105 | + + list(key) |
| 106 | + + [hw_high, hw_low, ip_firmware_major, ip_firmware_minor] |
| 107 | + + list(ip_module_serial) |
| 108 | + ) |
| 109 | + |
| 110 | + |
| 111 | +def test_parse_ip_payload_connect_response_success(): |
| 112 | + """Parse a successful IP connect response.""" |
| 113 | + raw = _build_ip_payload_connect_response( |
| 114 | + login_status=0x00, |
| 115 | + key=b"\xAA" * 16, |
| 116 | + hardware_version=0x0100, |
| 117 | + ip_firmware_major=0x05, |
| 118 | + ip_firmware_minor=0x02, |
| 119 | + ip_module_serial=b"\x71\x12\x34\x56", |
| 120 | + ) |
| 121 | + data = IPPayloadConnectResponse.parse(raw) |
| 122 | + |
| 123 | + assert data.login_status == "success" |
| 124 | + assert data.key == b"\xAA" * 16 |
| 125 | + assert data.hardware_version == 0x0100 |
| 126 | + assert data.ip_module_serial == b"\x71\x12\x34\x56" |
| 127 | + |
| 128 | + |
| 129 | +def test_parse_ip_payload_connect_response_invalid_password(): |
| 130 | + """Parse an invalid_password connect response.""" |
| 131 | + raw = _build_ip_payload_connect_response( |
| 132 | + login_status=0x01, |
| 133 | + key=b"\x00" * 16, |
| 134 | + ip_module_serial=b"\x71\x00\x00\x00", |
| 135 | + ) |
| 136 | + data = IPPayloadConnectResponse.parse(raw) |
| 137 | + |
| 138 | + assert data.login_status == "invalid_password" |
| 139 | + |
| 140 | + |
| 141 | +def test_parse_ip_payload_connect_response_user_already_connected(): |
| 142 | + """Parse user_already_connected connect response.""" |
| 143 | + raw = _build_ip_payload_connect_response( |
| 144 | + login_status=0x02, |
| 145 | + key=b"\x00" * 16, |
| 146 | + ip_module_serial=b"\x71\x00\x00\x00", |
| 147 | + ) |
| 148 | + data = IPPayloadConnectResponse.parse(raw) |
| 149 | + |
| 150 | + assert data.login_status == "user_already_connected" |
| 151 | + |
| 152 | + |
| 153 | +def test_parse_ip_payload_connect_response_user_already_connected1(): |
| 154 | + """Parse user_already_connected1 (0x04) connect response.""" |
| 155 | + raw = _build_ip_payload_connect_response( |
| 156 | + login_status=0x04, |
| 157 | + key=b"\x00" * 16, |
| 158 | + ip_module_serial=b"\x71\x00\x00\x00", |
| 159 | + ) |
| 160 | + data = IPPayloadConnectResponse.parse(raw) |
| 161 | + |
| 162 | + assert data.login_status == "user_already_connected1" |
| 163 | + |
| 164 | + |
| 165 | +def test_parse_ip_payload_connect_response_key_contents(): |
| 166 | + """Parse verifies the key field is preserved exactly.""" |
| 167 | + test_key = bytes(range(16)) |
| 168 | + raw = _build_ip_payload_connect_response( |
| 169 | + key=test_key, |
| 170 | + ip_module_serial=b"\x71\x00\x00\x00", |
| 171 | + ) |
| 172 | + data = IPPayloadConnectResponse.parse(raw) |
| 173 | + |
| 174 | + assert data.key == test_key |
| 175 | + |
| 176 | + |
| 177 | +def test_parse_ip_payload_connect_response_hardware_version(): |
| 178 | + """Parse verifies hardware_version is decoded as big-endian.""" |
| 179 | + raw = _build_ip_payload_connect_response( |
| 180 | + hardware_version=0x0203, |
| 181 | + ip_module_serial=b"\x71\x00\x00\x00", |
| 182 | + ) |
| 183 | + data = IPPayloadConnectResponse.parse(raw) |
| 184 | + |
| 185 | + assert data.hardware_version == 0x0203 |
| 186 | + |
| 187 | + |
| 188 | +def test_parse_ip_payload_connect_response_ip150_type(): |
| 189 | + """Parse ip_type as IP150 when first serial byte is 0x71.""" |
| 190 | + raw = _build_ip_payload_connect_response( |
| 191 | + ip_module_serial=b"\x71\xAA\xBB\xCC", |
| 192 | + ) |
| 193 | + data = IPPayloadConnectResponse.parse(raw) |
| 194 | + |
| 195 | + assert data.ip_type == "IP150" |
| 196 | + |
| 197 | + |
| 198 | +def test_parse_ip_payload_connect_response_ip100_type(): |
| 199 | + """Parse ip_type as IP100 when first serial byte is 0x70.""" |
| 200 | + raw = _build_ip_payload_connect_response( |
| 201 | + ip_module_serial=b"\x70\x00\x00\x00", |
| 202 | + ) |
| 203 | + data = IPPayloadConnectResponse.parse(raw) |
| 204 | + |
| 205 | + assert data.ip_type == "IP100" |
| 206 | + |
| 207 | + |
| 208 | +def test_parse_ip_payload_connect_response_firmware_versions(): |
| 209 | + """Parse ip_firmware_major and ip_firmware_minor using HexInt adapter.""" |
| 210 | + # HexInt: raw 0x05 → int(hex(5)[2:], 10) = int("5", 10) = 5 |
| 211 | + # HexInt: raw 0x02 → int(hex(2)[2:], 10) = int("2", 10) = 2 |
| 212 | + raw = _build_ip_payload_connect_response( |
| 213 | + ip_firmware_major=0x05, |
| 214 | + ip_firmware_minor=0x02, |
| 215 | + ip_module_serial=b"\x71\x00\x00\x00", |
| 216 | + ) |
| 217 | + data = IPPayloadConnectResponse.parse(raw) |
| 218 | + |
| 219 | + assert data.ip_firmware_major == 5 |
| 220 | + assert data.ip_firmware_minor == 2 |
| 221 | + |
| 222 | + |
| 223 | +def test_parse_ip_payload_connect_response_serial_preserved(): |
| 224 | + """Parse verifies ip_module_serial bytes are preserved.""" |
| 225 | + serial = b"\x71\x01\x02\x03" |
| 226 | + raw = _build_ip_payload_connect_response(ip_module_serial=serial) |
| 227 | + data = IPPayloadConnectResponse.parse(raw) |
| 228 | + |
| 229 | + assert data.ip_module_serial == serial |
0 commit comments