Skip to content

Commit 92aea34

Browse files
committed
ThreadPool working
1 parent 87a4fb4 commit 92aea34

6 files changed

Lines changed: 277 additions & 22 deletions

File tree

networking/Client.java

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
import java.io.BufferedInputStream;
2+
import java.io.BufferedOutputStream;
3+
import java.io.DataInputStream;
4+
import java.io.DataOutputStream;
5+
import java.io.IOException;
6+
import java.net.Socket;
7+
import java.util.concurrent.ExecutorService;
8+
9+
public class Client implements Runnable {
10+
private Socket socket;
11+
private DataInputStream input;
12+
private DataOutputStream output;
13+
private boolean isStopped;
14+
private ExecutorService threadPool;
15+
private Client trackClient;
16+
17+
private int numIter = 0;
18+
19+
Client(ExecutorService threadPool) {
20+
this.threadPool = threadPool;
21+
this.trackClient = this;
22+
}
23+
24+
Client(Socket s, Client trackClient, ExecutorService threadPool) {
25+
this.socket = s;
26+
if (trackClient != null) {
27+
this.trackClient = trackClient;
28+
} else {
29+
this.trackClient = this;
30+
}
31+
this.threadPool = threadPool;
32+
try {
33+
this.input = new DataInputStream(this.getSocket().getInputStream());
34+
} catch (IOException e) {
35+
System.out.println("Error opening input stream for socket: " + this.socket.toString());
36+
e.printStackTrace();
37+
}
38+
try {
39+
this.output = new DataOutputStream(this.getSocket().getOutputStream());
40+
} catch (IOException e) {
41+
System.out.println("Error opening output stream for socket: " + this.socket.toString());
42+
e.printStackTrace();
43+
}
44+
}
45+
46+
public Socket getSocket() {
47+
return this.socket;
48+
}
49+
50+
public int getNumIter() {
51+
return this.numIter;
52+
}
53+
54+
public synchronized void setNumIter(int n) {
55+
this.numIter = n;
56+
}
57+
58+
public synchronized void iterNumIter() {
59+
this.numIter++;
60+
}
61+
62+
@Override
63+
/*
64+
* Listening socket thread for each client (non-Javadoc) ExecutorService
65+
* threadPool
66+
*
67+
* @see java.lang.Runnable#run()
68+
*/
69+
public void run() {
70+
while (!this.isStopped()) {
71+
try {
72+
// TODO: Just keeping track of a number for now
73+
synchronized (this.input) {
74+
this.threadPool.execute(new ClientRead(this.input.readInt(), this.trackClient));
75+
}
76+
} catch (IOException e) {
77+
System.out.print("Error reading from socket: " + this.socket.toString() + ": " + e.toString());
78+
e.printStackTrace();
79+
}
80+
}
81+
}
82+
83+
public synchronized void stop() {
84+
this.isStopped = true;
85+
try {
86+
this.socket.close();
87+
} catch (IOException e) {
88+
throw new RuntimeException("Error closing client socket", e);
89+
}
90+
}
91+
92+
public void write(int data) {
93+
this.threadPool.execute(new ClientWrite(data));
94+
}
95+
96+
private synchronized boolean isStopped() {
97+
return this.isStopped;
98+
}
99+
100+
/**
101+
* Handles data from server to local client
102+
*
103+
* @author sworley
104+
*
105+
*/
106+
private class ClientRead implements Runnable {
107+
private int data;
108+
private Client client;
109+
110+
public ClientRead(int data, Client client) {
111+
this.data = data;
112+
this.client = client;
113+
}
114+
115+
@Override
116+
public void run() {
117+
this.client.setNumIter(this.data);
118+
}
119+
120+
}
121+
122+
private class ClientWrite implements Runnable {
123+
private int data;
124+
125+
public ClientWrite(int data) {
126+
this.data = data;
127+
}
128+
129+
@Override
130+
public void run() {
131+
synchronized (this.getClient().output) {
132+
try {
133+
this.getClient().output.writeInt(this.data);
134+
} catch (IOException e) {
135+
System.out.println("Error writing to socket: " + this.getClient().socket.toString());
136+
e.printStackTrace();
137+
}
138+
}
139+
}
140+
141+
public Client getClient() {
142+
return Client.this;
143+
}
144+
145+
}
146+
}
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,36 @@
11
import java.io.IOException;
22
import java.net.ServerSocket;
33
import java.net.Socket;
4+
import java.util.concurrent.CopyOnWriteArrayList;
45
import java.util.concurrent.ExecutorService;
56

67
// From http://tutorials.jenkov.com/java-multithreaded-servers/thread-pooled-server.html
78

8-
public class ThreadPoolServer implements Runnable {
9+
public class Server implements Runnable {
910

1011
protected ServerSocket serverSocket;
1112
protected Thread runningThread;
12-
protected ExecutorService threadPool;
13-
13+
protected CopyOnWriteArrayList<Client> clients;
14+
private Client localClient;
1415

1516
protected int serverPort = 9000;
1617
protected boolean isStopped = false;
17-
18-
public ThreadPoolServer(int port, ExecutorService threadPool) {
18+
private ExecutorService threadPool;
19+
20+
public Server(int port, ExecutorService threadPool) {
1921
this.serverPort = port;
2022
this.threadPool = threadPool;
23+
this.clients = new CopyOnWriteArrayList<Client>();
24+
this.localClient = new Client(threadPool);
2125
}
2226

2327
@Override
2428
public void run() {
25-
synchronized(this) {
29+
synchronized (this) {
2630
this.runningThread = Thread.currentThread();
2731
}
2832
this.openServerSocket();
29-
while (! this.isStopped()) {
33+
while (!this.isStopped()) {
3034
Socket clientSocket = null;
3135
try {
3236
clientSocket = this.serverSocket.accept();
@@ -36,21 +40,35 @@ public void run() {
3640
}
3741
throw new RuntimeException("Error accepting client connection" + e);
3842
}
39-
this.threadPool.execute(new WorkerRunnable(clientSocket, "Thread Pooled Server"));
43+
Client client = new Client(clientSocket, this.localClient, this.threadPool);
44+
synchronized (this.clients) {
45+
this.clients.add(client);
46+
}
47+
new Thread(client).start();
4048
}
41-
this.threadPool.shutdown();
4249
System.out.println("Server Stopped");
4350
}
4451

4552
public synchronized void stop() {
4653
this.isStopped = true;
54+
for (Client client : clients) {
55+
client.stop();
56+
}
4757
try {
4858
this.serverSocket.close();
4959
} catch (IOException e) {
5060
throw new RuntimeException("Error closing server", e);
5161
}
5262
}
5363

64+
public Client getLocalClient() {
65+
return this.localClient;
66+
}
67+
68+
public void updateClients() {
69+
this.threadPool.execute(new UpdateClients(this.clients, this.localClient));
70+
}
71+
5472
private synchronized boolean isStopped() {
5573
return this.isStopped;
5674
}
@@ -62,5 +80,27 @@ private void openServerSocket() {
6280
throw new RuntimeException("Cannot open port " + this.serverPort + ":" + e);
6381
}
6482
}
83+
84+
/*
85+
* TODO: For now just updating all clients with numIter
86+
*/
87+
private class UpdateClients implements Runnable {
88+
89+
private CopyOnWriteArrayList<Client> clients;
90+
private Client localState;
91+
92+
public UpdateClients(CopyOnWriteArrayList<Client> clients, Client localState) {
93+
this.clients = clients;
94+
this.localState = localState;
95+
}
96+
97+
@Override
98+
public void run() {
99+
for (Client client : this.clients) {
100+
client.write(this.localState.getNumIter());
101+
}
102+
}
103+
104+
}
65105

66106
}

networking/ThreadPoolClient.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
11

2-
public class ThreadPoolClient {
2+
public class ThreadPoolClient implements Runnable {
3+
4+
@Override
5+
public void run() {
6+
// TODO Auto-generated method stub
7+
8+
}
39

410
}

networking/UpdateClients.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import java.util.concurrent.CopyOnWriteArrayList;
2+
import java.util.concurrent.ExecutorService;
3+
4+
public class UpdateClients implements Runnable {
5+
6+
protected CopyOnWriteArrayList<Client> clientArray;
7+
protected ExecutorService threadPool;
8+
9+
UpdateClients(CopyOnWriteArrayList<Client> clientArray, ExecutorService threadPool) {
10+
this.clientArray = clientArray;
11+
this.threadPool = threadPool;
12+
}
13+
14+
@Override
15+
public void run() {
16+
synchronized (clientArray) {
17+
for (Client client : this.clientArray) {
18+
19+
}
20+
}
21+
}
22+
23+
}

networking/WorkerRunnable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ public WorkerRunnable(Socket clientSocket, String serverText) {
1717

1818
@Override
1919
public void run() {
20-
try {
20+
try {
2121
InputStream input = clientSocket.getInputStream();
2222
OutputStream output = clientSocket.getOutputStream();
2323
long time = System.currentTimeMillis();

0 commit comments

Comments
 (0)