Skip to content

Commit 8fcdb40

Browse files
authored
Merge pull request #9 from Together-Java/feat/use-docker-java-client
Feat/use docker java client
2 parents 7bb9d99 + 7a927d7 commit 8fcdb40

6 files changed

Lines changed: 198 additions & 39 deletions

File tree

JShellAPI/build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ repositories {
2424
dependencies {
2525
implementation project(':JShellWrapper')
2626
implementation 'org.springframework.boot:spring-boot-starter-web'
27+
implementation 'com.github.docker-java:docker-java-transport-httpclient5:3.3.4'
28+
implementation 'com.github.docker-java:docker-java-core:3.3.4'
29+
2730
testImplementation 'org.springframework.boot:spring-boot-starter-test'
2831
annotationProcessor "org.springframework.boot:spring-boot-configuration-processor"
2932
}

JShellAPI/src/main/java/org/togetherjava/jshellapi/Config.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ public record Config(
1111
long maxAliveSessions,
1212
int dockerMaxRamMegaBytes,
1313
double dockerCPUsUsage,
14-
long schedulerSessionKillScanRateSeconds) {
14+
long schedulerSessionKillScanRateSeconds,
15+
long dockerResponseTimeout,
16+
long dockerConnectionTimeout) {
1517
public Config {
1618
if(regularSessionTimeoutSeconds <= 0) throw new RuntimeException("Invalid value " + regularSessionTimeoutSeconds);
1719
if(oneTimeSessionTimeoutSeconds <= 0) throw new RuntimeException("Invalid value " + oneTimeSessionTimeoutSeconds);
@@ -21,5 +23,7 @@ public record Config(
2123
if(dockerMaxRamMegaBytes <= 0) throw new RuntimeException("Invalid value " + dockerMaxRamMegaBytes);
2224
if(dockerCPUsUsage <= 0) throw new RuntimeException("Invalid value " + dockerCPUsUsage);
2325
if(schedulerSessionKillScanRateSeconds <= 0) throw new RuntimeException("Invalid value " + schedulerSessionKillScanRateSeconds);
26+
if(dockerResponseTimeout <= 0) throw new RuntimeException("Invalid value " + dockerResponseTimeout);
27+
if(dockerConnectionTimeout <= 0) throw new RuntimeException("Invalid value " + dockerConnectionTimeout);
2428
}
2529
}
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
package org.togetherjava.jshellapi.service;
2+
3+
import com.github.dockerjava.api.DockerClient;
4+
import com.github.dockerjava.api.async.ResultCallback;
5+
import com.github.dockerjava.api.command.PullImageResultCallback;
6+
import com.github.dockerjava.api.model.*;
7+
import com.github.dockerjava.core.DefaultDockerClientConfig;
8+
import com.github.dockerjava.core.DockerClientImpl;
9+
import com.github.dockerjava.httpclient5.ApacheDockerHttpClient;
10+
import org.slf4j.Logger;
11+
import org.slf4j.LoggerFactory;
12+
import org.springframework.beans.factory.DisposableBean;
13+
import org.springframework.beans.factory.annotation.Autowired;
14+
import org.springframework.stereotype.Service;
15+
import org.togetherjava.jshellapi.Config;
16+
17+
import java.io.*;
18+
import java.nio.charset.StandardCharsets;
19+
import java.time.Duration;
20+
import java.util.*;
21+
import java.util.concurrent.TimeUnit;
22+
23+
@Service
24+
public class DockerService implements DisposableBean {
25+
private static final Logger LOGGER = LoggerFactory.getLogger(DockerService.class);
26+
private static final String WORKER_LABEL = "jshell-api-worker";
27+
private static final UUID WORKER_UNIQUE_ID = UUID.randomUUID();
28+
29+
private final DockerClient client;
30+
31+
public DockerService(Config config) {
32+
DefaultDockerClientConfig clientConfig = DefaultDockerClientConfig.createDefaultConfigBuilder().build();
33+
ApacheDockerHttpClient httpClient = new ApacheDockerHttpClient.Builder()
34+
.dockerHost(clientConfig.getDockerHost())
35+
.sslConfig(clientConfig.getSSLConfig())
36+
.responseTimeout(Duration.ofSeconds(config.dockerResponseTimeout()))
37+
.connectionTimeout(Duration.ofSeconds(config.dockerConnectionTimeout()))
38+
.build();
39+
this.client = DockerClientImpl.getInstance(clientConfig, httpClient);
40+
41+
cleanupLeftovers(WORKER_UNIQUE_ID);
42+
}
43+
44+
private void cleanupLeftovers(UUID currentId) {
45+
for (Container container : client.listContainersCmd().withLabelFilter(Set.of(WORKER_LABEL)).exec()) {
46+
String containerHumanName = container.getId() + " " + Arrays.toString(container.getNames());
47+
LOGGER.info("Found worker container '{}'", containerHumanName);
48+
if (!container.getLabels().get(WORKER_LABEL).equals(currentId.toString())) {
49+
LOGGER.info("Killing container '{}'", containerHumanName);
50+
client.killContainerCmd(container.getId()).exec();
51+
}
52+
}
53+
}
54+
55+
public String spawnContainer(
56+
long maxMemoryMegs, long cpus, String name, Duration evalTimeout, long sysoutLimit
57+
) throws InterruptedException {
58+
String imageName = "togetherjava.org:5001/togetherjava/jshellwrapper";
59+
boolean presentLocally = client.listImagesCmd()
60+
.withFilter("reference", List.of(imageName))
61+
.exec()
62+
.stream()
63+
.flatMap(it -> Arrays.stream(it.getRepoTags()))
64+
.anyMatch(it -> it.endsWith(":master"));
65+
66+
if (!presentLocally) {
67+
client.pullImageCmd(imageName)
68+
.withTag("master")
69+
.exec(new PullImageResultCallback())
70+
.awaitCompletion(5, TimeUnit.MINUTES);
71+
}
72+
73+
return client.createContainerCmd(
74+
imageName + ":master"
75+
)
76+
.withHostConfig(
77+
HostConfig.newHostConfig()
78+
.withAutoRemove(true)
79+
.withInit(true)
80+
.withCapDrop(Capability.ALL)
81+
.withNetworkMode("none")
82+
.withPidsLimit(2000L)
83+
.withReadonlyRootfs(true)
84+
.withMemory(maxMemoryMegs * 1024 * 1024)
85+
.withCpuCount(cpus)
86+
)
87+
.withStdinOpen(true)
88+
.withAttachStdin(true)
89+
.withAttachStderr(true)
90+
.withAttachStdout(true)
91+
.withEnv("evalTimeoutSeconds=" + evalTimeout.toSeconds(), "sysOutCharLimit=" + sysoutLimit)
92+
.withLabels(Map.of(WORKER_LABEL, WORKER_UNIQUE_ID.toString()))
93+
.withName(name)
94+
.exec()
95+
.getId();
96+
}
97+
98+
public InputStream startAndAttachToContainer(String containerId, InputStream stdin) throws IOException {
99+
PipedInputStream pipeIn = new PipedInputStream();
100+
PipedOutputStream pipeOut = new PipedOutputStream(pipeIn);
101+
102+
client.attachContainerCmd(containerId)
103+
.withLogs(true)
104+
.withFollowStream(true)
105+
.withStdOut(true)
106+
.withStdErr(true)
107+
.withStdIn(stdin)
108+
.exec(new ResultCallback.Adapter<>() {
109+
@Override
110+
public void onNext(Frame object) {
111+
try {
112+
String payloadString = new String(object.getPayload(), StandardCharsets.UTF_8);
113+
if (object.getStreamType() == StreamType.STDOUT) {
114+
pipeOut.write(object.getPayload());
115+
} else {
116+
LOGGER.warn(
117+
"Received STDERR from container {}: {}",
118+
containerId,
119+
payloadString
120+
);
121+
}
122+
} catch (IOException e) {
123+
throw new UncheckedIOException(e);
124+
}
125+
}
126+
});
127+
128+
client.startContainerCmd(containerId).exec();
129+
return pipeIn;
130+
}
131+
132+
public void killContainerByName(String name) {
133+
for (Container container : client.listContainersCmd().withNameFilter(Set.of(name)).exec()) {
134+
client.killContainerCmd(container.getId()).exec();
135+
}
136+
}
137+
138+
public boolean isDead(String containerName) {
139+
return client.listContainersCmd().withNameFilter(Set.of(containerName)).exec().isEmpty();
140+
}
141+
142+
@Override
143+
public void destroy() throws Exception {
144+
LOGGER.info("destroy() called. Destroying all containers...");
145+
cleanupLeftovers(UUID.randomUUID());
146+
client.close();
147+
}
148+
149+
}

JShellAPI/src/main/java/org/togetherjava/jshellapi/service/JShellService.java

Lines changed: 28 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,10 @@
44
import org.togetherjava.jshellapi.dto.*;
55
import org.togetherjava.jshellapi.exceptions.DockerException;
66

7-
import java.io.BufferedReader;
8-
import java.io.BufferedWriter;
9-
import java.io.File;
10-
import java.io.IOException;
7+
import java.io.*;
118
import java.nio.file.Files;
129
import java.nio.file.Path;
10+
import java.time.Duration;
1311
import java.time.Instant;
1412
import java.util.ArrayList;
1513
import java.util.List;
@@ -18,16 +16,17 @@
1816
public class JShellService implements Closeable {
1917
private final JShellSessionService sessionService;
2018
private final String id;
21-
private Process process;
2219
private final BufferedWriter writer;
2320
private final BufferedReader reader;
2421

2522
private Instant lastTimeoutUpdate;
2623
private final long timeout;
2724
private final boolean renewable;
2825
private boolean doingOperation;
26+
private final DockerService dockerService;
2927

30-
public JShellService(JShellSessionService sessionService, String id, long timeout, boolean renewable, long evalTimeout, int sysOutCharLimit, int maxMemory, double cpus, String startupScript) throws DockerException {
28+
public JShellService(DockerService dockerService, JShellSessionService sessionService, String id, long timeout, boolean renewable, long evalTimeout, int sysOutCharLimit, int maxMemory, double cpus, String startupScript) throws DockerException {
29+
this.dockerService = dockerService;
3130
this.sessionService = sessionService;
3231
this.id = id;
3332
this.timeout = timeout;
@@ -39,30 +38,23 @@ public JShellService(JShellSessionService sessionService, String id, long timeou
3938
Files.createDirectories(errorLogs.getParent());
4039
Files.createFile(errorLogs);
4140
}
42-
process = new ProcessBuilder(
43-
"docker",
44-
"run",
45-
"--rm",
46-
"-i",
47-
"--init",
48-
"--cap-drop=ALL",
49-
"--network=none",
50-
"--pids-limit=2000",
51-
"--read-only",
52-
"--memory=" + maxMemory + "m",
53-
"--cpus=" + cpus,
54-
"--name", containerName(),
55-
"-e", "\"evalTimeoutSeconds=%d\"".formatted(evalTimeout),
56-
"-e", "\"sysOutCharLimit=%d\"".formatted(sysOutCharLimit),
57-
"togetherjava.org:5001/togetherjava/jshellwrapper:master")
58-
.directory(new File(".."))
59-
.redirectError(errorLogs.toFile())
60-
.start();
61-
writer = process.outputWriter();
62-
reader = process.inputReader();
41+
String containerId = dockerService.spawnContainer(
42+
maxMemory,
43+
(long) Math.ceil(cpus),
44+
containerName(),
45+
Duration.ofSeconds(evalTimeout),
46+
sysOutCharLimit
47+
);
48+
PipedInputStream containerInput = new PipedInputStream();
49+
this.writer = new BufferedWriter(new OutputStreamWriter(new PipedOutputStream(containerInput)));
50+
InputStream containerOutput = dockerService.startAndAttachToContainer(
51+
containerId,
52+
containerInput
53+
);
54+
reader = new BufferedReader(new InputStreamReader(containerOutput));
6355
writer.write(sanitize(startupScript));
6456
writer.newLine();
65-
} catch (IOException e) {
57+
} catch (IOException | InterruptedException e) {
6658
throw new DockerException(e);
6759
}
6860
this.doingOperation = false;
@@ -73,6 +65,10 @@ public Optional<JShellResult> eval(String code) throws DockerException {
7365
return Optional.empty();
7466
}
7567
}
68+
if (isClosed()) {
69+
close();
70+
return Optional.empty();
71+
}
7672
updateLastTimeout();
7773
if(!code.endsWith("\n")) code += '\n';
7874
try {
@@ -86,7 +82,7 @@ public Optional<JShellResult> eval(String code) throws DockerException {
8682
checkContainerOK();
8783

8884
return Optional.of(readResult());
89-
} catch (IOException | NumberFormatException ex) {
85+
} catch (DockerException | IOException | NumberFormatException ex) {
9086
close();
9187
throw new DockerException(ex);
9288
} finally {
@@ -185,27 +181,22 @@ public String id() {
185181

186182
@Override
187183
public void close() {
188-
process.destroyForcibly();
189184
try {
190185
try {
191186
writer.close();
192187
} finally {
193188
reader.close();
194189
}
195-
new ProcessBuilder("docker", "kill", containerName())
196-
.directory(new File(".."))
197-
.start()
198-
.waitFor();
199-
} catch(IOException | InterruptedException ex) {
190+
dockerService.killContainerByName(containerName());
191+
} catch(IOException ex) {
200192
throw new RuntimeException(ex);
201193
}
202-
process = null;
203194
sessionService.notifyDeath(id);
204195
}
205196

206197
@Override
207198
public boolean isClosed() {
208-
return process == null;
199+
return dockerService.isDead(containerName());
209200
}
210201

211202
private void updateLastTimeout() {

JShellAPI/src/main/java/org/togetherjava/jshellapi/service/JShellSessionService.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ public class JShellSessionService {
1818
private Config config;
1919
private StartupScriptsService startupScriptsService;
2020
private ScheduledExecutorService scheduler;
21+
private DockerService dockerService;
2122
private final Map<String, JShellService> jshellSessions = new HashMap<>();
23+
2224
private void initScheduler() {
2325
scheduler = Executors.newSingleThreadScheduledExecutor();
2426
scheduler.scheduleAtFixedRate(() -> {
@@ -74,6 +76,7 @@ private synchronized JShellService createSession(String id, long sessionTimeout,
7476
throw new ResponseStatusException(HttpStatus.TOO_MANY_REQUESTS, "Too many sessions, try again later :(.");
7577
}
7678
JShellService service = new JShellService(
79+
dockerService,
7780
this,
7881
id,
7982
sessionTimeout,
@@ -97,4 +100,9 @@ public void setConfig(Config config) {
97100
public void setStartupScriptsService(StartupScriptsService startupScriptsService) {
98101
this.startupScriptsService = startupScriptsService;
99102
}
103+
104+
@Autowired
105+
public void setDockerService(DockerService dockerService) {
106+
this.dockerService = dockerService;
107+
}
100108
}

JShellAPI/src/main/resources/application.properties

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,8 @@ jshellapi.dockerMaxRamMegaBytes=100
1111
jshellapi.dockerCPUsUsage=0.5
1212

1313
# Internal config
14-
jshellapi.schedulerSessionKillScanRateSeconds=60
14+
jshellapi.schedulerSessionKillScanRateSeconds=60
15+
16+
# Docker service config
17+
jshellapi.dockerResponseTimeout=60
18+
jshellapi.dockerConnectionTimeout=60

0 commit comments

Comments
 (0)