Skip to content

Commit 4b7dfb4

Browse files
committed
Make Docker and local process communication more robust
1 parent c7d16e0 commit 4b7dfb4

4 files changed

Lines changed: 161 additions & 32 deletions

File tree

PyStemmusScope/bmi/docker_process.py

Lines changed: 67 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
11
"""The Docker STEMMUS_SCOPE model process wrapper."""
22
import os
3+
import socket as pysocket
4+
import warnings
35
from time import sleep
46
from typing import Any
57
from PyStemmusScope.bmi.docker_utils import check_tags
68
from PyStemmusScope.bmi.docker_utils import find_image
79
from PyStemmusScope.bmi.docker_utils import make_docker_vols_binds
10+
from PyStemmusScope.bmi.utils import MATLAB_ERROR
11+
from PyStemmusScope.bmi.utils import PROCESS_FINALIZED
12+
from PyStemmusScope.bmi.utils import PROCESS_READY
13+
from PyStemmusScope.bmi.utils import MatlabError
814
from PyStemmusScope.config_io import read_config
915

1016

@@ -14,18 +20,64 @@
1420
docker = None
1521

1622

17-
def wait_for_model(phrase: bytes, socket: Any) -> None:
23+
def _model_is_ready(socket: Any, client: Any, container_id: Any) -> None:
24+
return _wait_for_model(PROCESS_READY, socket, client, container_id)
25+
26+
27+
def _model_is_finalized(socket: Any, client: Any, container_id: Any) -> None:
28+
return _wait_for_model(PROCESS_FINALIZED, socket, client, container_id)
29+
30+
31+
def _wait_for_model(phrase: bytes, socket: Any, client: Any, container_id: Any) -> None:
1832
"""Wait for the model to be ready to receive (more) commands, or is finalized."""
1933
output = b""
2034

2135
while phrase not in output:
22-
data = socket.read(1)
36+
try:
37+
data = socket.read(1)
38+
except TimeoutError as err:
39+
client.stop(container_id)
40+
logs = client.logs(container_id).decode("utf-8")
41+
msg = (
42+
f"Container connection timed out '{container_id['Id']}'."
43+
f"\nPlease inspect logs:\n{logs}"
44+
)
45+
raise TimeoutError(msg) from err
46+
2347
if data is None:
2448
msg = "Could not read data from socket. Docker container might be dead."
2549
raise ConnectionError(msg)
2650
else:
2751
output += bytes(data)
2852

53+
if MATLAB_ERROR in output:
54+
client.stop(container_id)
55+
logs = client.logs(container_id).decode("utf-8")
56+
msg = (
57+
f"Error in container '{container_id['Id']}'.\n"
58+
f"Please inspect logs:\n{logs}"
59+
)
60+
raise MatlabError(msg)
61+
62+
63+
def _attach_socket(client, container_id) -> Any:
64+
"""Attach a socket to a container and add a timeout to it."""
65+
connection_timeout = 30 # seconds
66+
67+
socket = client.attach_socket(container_id, {"stdin": 1, "stdout": 1, "stream": 1})
68+
if isinstance(socket, pysocket.SocketIO):
69+
socket._sock.settimeout(connection_timeout) # type: ignore
70+
else:
71+
warnings.warn(
72+
message=(
73+
"Unknown socket type found. This might cause issues with the Docker"
74+
" connection. \nPlease report this to the developers in an issue "
75+
"on: https://github.com/EcoExtreML/STEMMUS_SCOPE_Processing/issues"
76+
),
77+
stacklevel=1,
78+
)
79+
return socket
80+
2981

3082
class StemmusScopeDocker:
3183
"""Communicate with a STEMMUS_SCOPE Docker container."""
@@ -60,9 +112,9 @@ def __init__(self, cfg_file: str):
60112

61113
self.running = False
62114

63-
def wait_for_model(self) -> None:
115+
def _wait_for_model(self) -> None:
64116
"""Wait for the model to be ready to receive (more) commands."""
65-
wait_for_model(self._process_ready_phrase, self.socket)
117+
_model_is_ready(self.socket, self.client, self.container_id)
66118

67119
def is_alive(self) -> bool:
68120
"""Return if the process is alive."""
@@ -74,23 +126,22 @@ def initialize(self) -> None:
74126
self.client.stop(self.container_id)
75127

76128
self.client.start(self.container_id)
77-
self.socket = self.client.attach_socket(
78-
self.container_id, {"stdin": 1, "stdout": 1, "stream": 1}
79-
)
80-
self.wait_for_model()
129+
self.socket = _attach_socket(self.client, self.container_id)
130+
131+
self._wait_for_model()
81132
os.write(
82133
self.socket.fileno(),
83134
bytes(f'initialize "{self.cfg_file}"\n', encoding="utf-8"),
84135
)
85-
self.wait_for_model()
136+
self._wait_for_model()
86137

87138
self.running = True
88139

89140
def update(self) -> None:
90141
"""Update the model and wait for it to be ready."""
91142
if self.is_alive():
92143
os.write(self.socket.fileno(), b"update\n")
93-
self.wait_for_model()
144+
self._wait_for_model()
94145
else:
95146
msg = "Docker container is not alive. Please restart the model."
96147
raise ConnectionError(msg)
@@ -99,9 +150,14 @@ def finalize(self) -> None:
99150
"""Finalize the model."""
100151
if self.is_alive():
101152
os.write(self.socket.fileno(), b"finalize\n")
102-
wait_for_model(self._process_finalized_phrase, self.socket)
153+
_model_is_finalized(
154+
self.socket,
155+
self.client,
156+
self.container_id,
157+
)
103158
sleep(0.5) # Ensure the container can stop cleanly.
104159
self.client.stop(self.container_id)
160+
self.running = False
105161
self.client.remove_container(self.container_id, v=True)
106162
else:
107163
pass

PyStemmusScope/bmi/local_process.py

Lines changed: 79 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
"""The local STEMMUS_SCOPE model process wrapper."""
22
import os
3+
import platform
34
import subprocess
45
from pathlib import Path
6+
from time import sleep
57
from typing import Union
8+
from PyStemmusScope.bmi.utils import MATLAB_ERROR
9+
from PyStemmusScope.bmi.utils import PROCESS_READY
10+
from PyStemmusScope.bmi.utils import MatlabError
611
from PyStemmusScope.config_io import read_config
712

813

9-
def is_alive(process: Union[subprocess.Popen, None]) -> subprocess.Popen:
14+
def alive_process(process: Union[subprocess.Popen, None]) -> subprocess.Popen:
1015
"""Return process if the process is alive, raise an exception if it is not."""
1116
if process is None:
1217
msg = "Model process does not seem to be open."
@@ -17,12 +22,39 @@ def is_alive(process: Union[subprocess.Popen, None]) -> subprocess.Popen:
1722
return process
1823

1924

20-
def wait_for_model(process: subprocess.Popen, phrase=b"Select BMI mode:") -> None:
25+
def read_stdout(process: subprocess.Popen) -> bytes:
26+
"""Read from stdout. If the stream ends unexpectedly, an error is raised."""
27+
assert process.stdout is not None # required for type narrowing.
28+
read = process.stdout.read(1)
29+
if read is None:
30+
sleep(5)
31+
read = process.stdout.read(1)
32+
if read is not None:
33+
return bytes(read)
34+
msg = "Connection error: could not find expected output or "
35+
raise ConnectionError(msg)
36+
return bytes(read)
37+
38+
39+
def _model_is_ready(process: subprocess.Popen) -> None:
40+
return _wait_for_model(PROCESS_READY, process)
41+
42+
43+
def _wait_for_model(phrase: bytes, process: subprocess.Popen) -> None:
2144
"""Wait for model to be ready for interaction."""
2245
output = b""
23-
while is_alive(process) and phrase not in output:
24-
assert process.stdout is not None # required for type narrowing.
25-
output += bytes(process.stdout.read(1))
46+
47+
while alive_process(process) and phrase not in output:
48+
output += read_stdout(process)
49+
if MATLAB_ERROR in output:
50+
try:
51+
process.terminate()
52+
finally:
53+
msg = (
54+
"Error encountered in Matlab.\n"
55+
"Please inspect logs in the output directory"
56+
)
57+
raise MatlabError(msg)
2658

2759

2860
def find_exe(config: dict) -> str:
@@ -51,46 +83,73 @@ def __init__(self, cfg_file: str) -> None:
5183
exe_file = find_exe(config)
5284
args = [exe_file, cfg_file, "bmi"]
5385

54-
os.environ["MATLAB_LOG_DIR"] = str(config["InputPath"])
55-
56-
self.matlab_process = subprocess.Popen(
86+
lib_path = os.getenv("LD_LIBRARY_PATH")
87+
if lib_path is None:
88+
msg = (
89+
"Environment variable LD_LIBRARY_PATH not found. "
90+
"Refer the Matlab Compiler Runtime documentation"
91+
)
92+
raise ValueError(msg)
93+
94+
# Ensure output directory exists so log file can be written:
95+
Path(config["OutputPath"]).mkdir(parents=True, exist_ok=True)
96+
env = {
97+
"LD_LIBRARY_PATH": lib_path,
98+
"MATLAB_LOG_DIR": str(config["OutputPath"]),
99+
}
100+
101+
self.process = subprocess.Popen(
57102
args,
58103
stdin=subprocess.PIPE,
59104
stdout=subprocess.PIPE,
60105
bufsize=0,
106+
env=env,
61107
)
62108

63-
wait_for_model(self.matlab_process)
109+
if platform.system() == "Linux":
110+
assert self.process.stdout is not None # required for type narrowing.
111+
# Make the connection non-blocking to allow for a timeout on read.
112+
os.set_blocking(self.process.stdout.fileno(), False)
113+
else:
114+
msg = "Unexpected system. The executable is only compiled for Linux."
115+
raise ValueError(msg)
116+
_model_is_ready(self.process)
64117

65118
def is_alive(self) -> bool:
66119
"""Return if the process is alive."""
67120
try:
68-
is_alive(self.matlab_process)
121+
alive_process(self.process)
69122
return True
70123
except ConnectionError:
71124
return False
72125

73126
def initialize(self) -> None:
74127
"""Initialize the model and wait for it to be ready."""
75-
self.matlab_process = is_alive(self.matlab_process)
128+
self.process = alive_process(self.process)
76129

77-
self.matlab_process.stdin.write( # type: ignore
130+
self.process.stdin.write( # type: ignore
78131
bytes(f'initialize "{self.cfg_file}"\n', encoding="utf-8")
79132
)
80-
wait_for_model(self.matlab_process)
133+
_model_is_ready(self.process)
81134

82135
def update(self) -> None:
83136
"""Update the model and wait for it to be ready."""
84-
if self.matlab_process is None:
137+
if self.process is None:
85138
msg = "Run initialize before trying to update the model."
86139
raise AttributeError(msg)
87140

88-
self.matlab_process = is_alive(self.matlab_process)
89-
self.matlab_process.stdin.write(b"update\n") # type: ignore
90-
wait_for_model(self.matlab_process)
141+
self.process = alive_process(self.process)
142+
self.process.stdin.write(b"update\n") # type: ignore
143+
_model_is_ready(self.process)
91144

92145
def finalize(self) -> None:
93146
"""Finalize the model."""
94-
self.matlab_process = is_alive(self.matlab_process)
95-
self.matlab_process.stdin.write(b"finalize\n") # type: ignore
96-
wait_for_model(self.matlab_process, phrase=b"Finished clean up.")
147+
self.process = alive_process(self.process)
148+
self.process.stdin.write(b"finalize\n") # type: ignore
149+
sleep(10)
150+
if self.process.poll() != 0:
151+
try:
152+
self.process.terminate()
153+
finally:
154+
msg = f"Model terminated with return code {self.process.poll()}"
155+
raise ValueError(msg)

PyStemmusScope/bmi/utils.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,24 @@
22
import numpy as np
33

44

5+
# Phrases defined in the Matlab code to check for:
6+
PROCESS_READY = b"Select BMI mode:"
7+
PROCESS_FINALIZED = b"Finished clean up."
8+
MATLAB_ERROR = b"Error in "
9+
10+
511
INAPPLICABLE_GRID_METHOD_MSG = (
612
"This grid method is not implmented for the STEMMUS_SCOPE BMI because the model is"
713
"\non a rectilinear grid."
814
)
915

1016

17+
class MatlabError(Exception):
18+
"""Matlab code encountered an error."""
19+
20+
pass
21+
22+
1123
class InapplicableBmiMethods:
1224
"""Holds methods that are not applicable for STEMMUS_SCOPE's rectilinear grid."""
1325

pyproject.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ dev = [
7777
"mypy",
7878
"pytest",
7979
"pytest-cov",
80+
"types-requests", # type stubs
8081
]
8182
docs = [
8283
"mkdocs",
@@ -87,7 +88,7 @@ docs = [
8788
]
8889

8990
[tool.hatch.envs.default]
90-
features = ["dev"]
91+
features = ["dev", "docker"]
9192

9293
[tool.hatch.envs.default.scripts]
9394
lint = [
@@ -143,6 +144,7 @@ extend-select = [
143144
ignore = [
144145
"E501", # Line length: fails on many docstrings (needs fixing).
145146
"PLR2004", # magic value used in comparsion (i.e. `if ndays == 28: month_is_feb`).
147+
"B009", # getattr is useful to not mess with typing.
146148
]
147149
line-length = 88
148150
exclude = ["docs", "build"]

0 commit comments

Comments
 (0)