Skip to content

Commit 40d0115

Browse files
Establish OpenWRT firewall rule parser
This commit adds a firewall rule UCI parser to the OpenWRT backend. This commit includes: - Fixing test_default.py to use the new parser - New tests in test_firewall.py
1 parent 7837ad8 commit 40d0115

File tree

3 files changed

+270
-115
lines changed

3 files changed

+270
-115
lines changed

netjsonconfig/backends/openwrt/converters/firewall.py

Lines changed: 50 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,18 @@
55

66

77
class Firewall(OpenWrtConverter):
8-
netjson_key = 'firewall'
9-
intermediate_key = 'firewall'
10-
_uci_types = ['defaults', 'forwarding', 'zone', 'rule']
11-
_schema = schema['properties']['firewall']
8+
netjson_key = "firewall"
9+
intermediate_key = "firewall"
10+
_uci_types = ["defaults", "forwarding", "zone", "rule"]
11+
_schema = schema["properties"]["firewall"]
1212

1313
def to_intermediate_loop(self, block, result, index=None):
14-
forwardings = self.__intermediate_forwardings(block.pop('forwardings', {}))
15-
zones = self.__intermediate_zones(block.pop('zones', {}))
16-
rules = self.__intermediate_rules(block.pop('rules', {}))
17-
block.update({
18-
'.type': 'defaults',
19-
'.name': block.pop('id', 'defaults'),
20-
})
21-
result.setdefault('firewall', [])
22-
result['firewall'] = [self.sorted_dict(block)] + forwardings + zones + rules
14+
forwardings = self.__intermediate_forwardings(block.pop("forwardings", {}))
15+
zones = self.__intermediate_zones(block.pop("zones", {}))
16+
rules = self.__intermediate_rules(block.pop("rules", {}))
17+
block.update({".type": "defaults", ".name": block.pop("id", "defaults")})
18+
result.setdefault("firewall", [])
19+
result["firewall"] = [self.sorted_dict(block)] + forwardings + zones + rules
2320
return result
2421

2522
def __intermediate_forwardings(self, forwardings):
@@ -29,19 +26,26 @@ def __intermediate_forwardings(self, forwardings):
2926
"""
3027
result = []
3128
for forwarding in forwardings:
32-
resultdict = OrderedDict((('.name', self.__get_auto_name_forwarding(forwarding)),
33-
('.type', 'forwarding')))
29+
resultdict = OrderedDict(
30+
(
31+
(".name", self.__get_auto_name_forwarding(forwarding)),
32+
(".type", "forwarding"),
33+
)
34+
)
3435
resultdict.update(forwarding)
3536
result.append(resultdict)
3637
return result
3738

3839
def __get_auto_name_forwarding(self, forwarding):
39-
if 'family' in forwarding.keys():
40-
uci_name = self._get_uci_name('_'.join([forwarding['src'], forwarding['dest'],
41-
forwarding['family']]))
40+
if "family" in forwarding.keys():
41+
uci_name = self._get_uci_name(
42+
"_".join([forwarding["src"], forwarding["dest"], forwarding["family"]])
43+
)
4244
else:
43-
uci_name = self._get_uci_name('_'.join([forwarding['src'], forwarding['dest']]))
44-
return 'forwarding_{0}'.format(uci_name)
45+
uci_name = self._get_uci_name(
46+
"_".join([forwarding["src"], forwarding["dest"]])
47+
)
48+
return "forwarding_{0}".format(uci_name)
4549

4650
def __intermediate_zones(self, zones):
4751
"""
@@ -50,14 +54,15 @@ def __intermediate_zones(self, zones):
5054
"""
5155
result = []
5256
for zone in zones:
53-
resultdict = OrderedDict((('.name', self.__get_auto_name_zone(zone)),
54-
('.type', 'zone')))
57+
resultdict = OrderedDict(
58+
((".name", self.__get_auto_name_zone(zone)), (".type", "zone"))
59+
)
5560
resultdict.update(zone)
5661
result.append(resultdict)
5762
return result
5863

5964
def __get_auto_name_zone(self, zone):
60-
return 'zone_{0}'.format(self._get_uci_name(zone['name']))
65+
return "zone_{0}".format(self._get_uci_name(zone["name"]))
6166

6267
def __intermediate_rules(self, rules):
6368
"""
@@ -66,24 +71,33 @@ def __intermediate_rules(self, rules):
6671
"""
6772
result = []
6873
for rule in rules:
69-
if 'config_name' in rule:
70-
del rule['config_name']
71-
resultdict = OrderedDict((('.name', self.__get_auto_name_rule(rule)),
72-
('.type', 'rule')))
74+
if "config_name" in rule:
75+
del rule["config_name"]
76+
resultdict = OrderedDict(
77+
((".name", self.__get_auto_name_rule(rule)), (".type", "rule"))
78+
)
7379
resultdict.update(rule)
7480
result.append(resultdict)
7581
return result
7682

7783
def __get_auto_name_rule(self, rule):
78-
return 'rule_{0}'.format(self._get_uci_name(rule['name']))
84+
return "rule_{0}".format(self._get_uci_name(rule["name"]))
7985

8086
def to_netjson_loop(self, block, result, index):
81-
result['firewall'] = self.__netjson_firewall(block)
82-
return result
87+
result.setdefault("firewall", {})
88+
89+
block.pop(".name")
90+
_type = block.pop(".type")
91+
92+
if _type == "rule":
93+
rule = self.__netjson_rule(block)
94+
result["firewall"].setdefault("rules", [])
95+
result["firewall"]["rules"].append(rule)
96+
97+
return self.type_cast(result)
98+
99+
def __netjson_rule(self, rule):
100+
if "enabled" in rule:
101+
rule["enabled"] = rule.pop("enabled") == "1"
83102

84-
def __netjson_firewall(self, firewall):
85-
del firewall['.type']
86-
_name = firewall.pop('.name')
87-
if _name != 'firewall':
88-
firewall['id'] = _name
89-
return self.type_cast(firewall)
103+
return self.type_cast(rule)

tests/openwrt/test_default.py

Lines changed: 78 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -8,78 +8,72 @@ class TestDefault(unittest.TestCase, _TabsMixin):
88
maxDiff = None
99

1010
def test_render_default(self):
11-
o = OpenWrt({
12-
"luci": [
13-
{
14-
"config_name": "core",
15-
"config_value": "main",
16-
"lang": "auto",
17-
"resourcebase": "/luci-static/resources",
18-
"mediaurlbase": "/luci-static/bootstrap",
19-
"number": 4,
20-
"boolean": True
21-
}
22-
],
23-
"firewall": {
24-
"rules": [
25-
{
26-
"config_name": "rule",
27-
"name": "Allow-MLD",
28-
"src": "wan",
29-
"proto": "icmp",
30-
"src_ip": "fe80::/10",
31-
"family": "ipv6",
32-
"target": "ACCEPT",
33-
"icmp_type": [
34-
"130/0",
35-
"131/0",
36-
"132/0",
37-
"143/0"
38-
]
39-
},
11+
o = OpenWrt(
12+
{
13+
"luci": [
4014
{
41-
"config_name": "rule",
42-
"name": "Rule2",
43-
"src": "wan",
44-
"proto": "icmp",
45-
"src_ip": "192.168.1.1/24",
46-
"family": "ipv4",
47-
"target": "ACCEPT",
48-
"icmp_type": [
49-
"130/0",
50-
"131/0",
51-
"132/0",
52-
"143/0"
53-
]
15+
"config_name": "core",
16+
"config_value": "main",
17+
"lang": "auto",
18+
"resourcebase": "/luci-static/resources",
19+
"mediaurlbase": "/luci-static/bootstrap",
20+
"number": 4,
21+
"boolean": True,
5422
}
55-
]
23+
],
24+
"firewall": {
25+
"rules": [
26+
{
27+
"name": "Allow-MLD",
28+
"src": "wan",
29+
"proto": "icmp",
30+
"src_ip": "fe80::/10",
31+
"family": "ipv6",
32+
"target": "ACCEPT",
33+
"icmp_type": ["130/0", "131/0", "132/0", "143/0"],
34+
},
35+
{
36+
"name": "Rule2",
37+
"src": "wan",
38+
"proto": "icmp",
39+
"src_ip": "192.168.1.1/24",
40+
"family": "ipv4",
41+
"target": "ACCEPT",
42+
"icmp_type": ["130/0", "131/0", "132/0", "143/0"],
43+
},
44+
]
45+
},
5646
}
57-
})
58-
expected = self._tabs("""package firewall
47+
)
48+
expected = self._tabs(
49+
"""\
50+
package firewall
51+
52+
config defaults 'defaults'
5953
6054
config rule 'rule_Allow_MLD'
61-
option family 'ipv6'
62-
list icmp_type '130/0'
63-
list icmp_type '131/0'
64-
list icmp_type '132/0'
65-
list icmp_type '143/0'
6655
option name 'Allow-MLD'
67-
option proto 'icmp'
6856
option src 'wan'
57+
option proto 'icmp'
6958
option src_ip 'fe80::/10'
59+
option family 'ipv6'
7060
option target 'ACCEPT'
71-
72-
config rule 'rule_Rule2'
73-
option family 'ipv4'
7461
list icmp_type '130/0'
7562
list icmp_type '131/0'
7663
list icmp_type '132/0'
7764
list icmp_type '143/0'
65+
66+
config rule 'rule_Rule2'
7867
option name 'Rule2'
79-
option proto 'icmp'
8068
option src 'wan'
69+
option proto 'icmp'
8170
option src_ip '192.168.1.1/24'
71+
option family 'ipv4'
8272
option target 'ACCEPT'
73+
list icmp_type '130/0'
74+
list icmp_type '131/0'
75+
list icmp_type '132/0'
76+
list icmp_type '143/0'
8377
8478
package luci
8579
@@ -142,54 +136,59 @@ def test_parse_default(self):
142136
)
143137
o = OpenWrt(native=native)
144138
expected = {
145-
"luci": [
139+
"led": [
146140
{
147-
"config_name": "core",
148-
"config_value": "main",
149-
"lang": "auto",
150-
"resourcebase": "/luci-static/resources",
151-
"mediaurlbase": "/luci-static/bootstrap",
152-
"number": "4",
153-
"boolean": "1",
141+
"dev": "1-1.1",
142+
"interval": 50,
143+
"name": "USB1",
144+
"sysfs": "tp-link:green:usb1",
145+
"trigger": "usbdev",
154146
}
155147
],
148+
"interfaces": [{"name": "eth0", "type": "ethernet"}],
156149
"firewall": {
157150
"rules": [
158151
{
159-
"config_name": "rule",
152+
"family": "ipv6",
153+
"icmp_type": ["130/0", "131/0", "132/0", "143/0"],
160154
"name": "Allow-MLD",
161-
"src": "wan",
162155
"proto": "icmp",
156+
"src": "wan",
163157
"src_ip": "fe80::/10",
164-
"family": "ipv6",
165158
"target": "ACCEPT",
166-
"icmp_type": ["130/0", "131/0", "132/0", "143/0"]
167159
}
168160
]
169161
},
170-
"led": [
162+
"luci": [
171163
{
172-
"name": "USB1",
173-
"sysfs": "tp-link:green:usb1",
174-
"trigger": "usbdev",
175-
"dev": "1-1.1",
176-
"interval": 50,
164+
"boolean": "1",
165+
"lang": "auto",
166+
"mediaurlbase": "/luci-static/bootstrap",
167+
"number": "4",
168+
"resourcebase": "/luci-static/resources",
169+
"config_value": "main",
170+
"config_name": "core",
177171
}
178172
],
179-
"interfaces": [{"name": "eth0", "type": "ethernet"}],
180173
"system": [
181-
{"test": "1", "config_name": "custom", "config_value": "custom"}
174+
{"test": "1", "config_value": "custom", "config_name": "custom"}
182175
],
183176
}
177+
178+
print("*" * 80)
179+
import json
180+
181+
print(json.dumps(o.config, indent=4))
182+
print("*" * 80)
184183
self.assertDictEqual(o.config, expected)
185184

186185
def test_skip(self):
187186
o = OpenWrt({"skipme": {"enabled": True}})
188-
self.assertEqual(o.render(), '')
187+
self.assertEqual(o.render(), "")
189188

190189
def test_warning(self):
191190
o = OpenWrt({"luci": [{"unrecognized": True}]})
192-
self.assertEqual(o.render(), '')
191+
self.assertEqual(o.render(), "")
193192

194193
def test_merge(self):
195194
template = {
@@ -228,8 +227,8 @@ def test_merge(self):
228227
self.assertEqual(o.config, expected)
229228

230229
def test_skip_nonlists(self):
231-
o = OpenWrt({"custom_package": {'unknown': True}})
232-
self.assertEqual(o.render(), '')
230+
o = OpenWrt({"custom_package": {"unknown": True}})
231+
self.assertEqual(o.render(), "")
233232

234233
def test_render_invalid_uci_name(self):
235234
o = OpenWrt(

0 commit comments

Comments
 (0)