Here, we improve upon the example. In the last post, I mentioned that there were still fixes to be made, and today we will discuss one of them.
Below is the copy from the previous lesson. Take a closer look at the run() method of removeThread, line 54-57 below:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class SharedQueueSafe | |
{ | |
private int[] queue; | |
private int head, tail, num_elems; | |
private int size; | |
public SharedQueueSafe(int size) | |
{ | |
this.size = size; | |
queue = new int[size]; | |
for (int i=0; i<size; i++) | |
queue[i] = -1; | |
head = tail = num_elems = 0; | |
} | |
public synchronized void enqueue(int data) | |
{ | |
num_elems++; | |
System.out.println("inserting " + data + " into the queue"); | |
try | |
{ | |
Thread.sleep(100); | |
} | |
catch (InterruptedException ie) | |
{ | |
} | |
queue[head] = data; | |
head = (head + 1) % size; | |
} | |
public synchronized int dequeue() | |
{ | |
num_elems--; | |
int ret = queue[tail]; | |
tail = (tail + 1) % size; | |
return ret; | |
} | |
public synchronized int getElementsInQueue() | |
{ | |
return num_elems; | |
} | |
static public void main(String[] args) | |
{ | |
SharedQueueSafe sharedQueue = new SharedQueueSafe(10); | |
Thread insertThread = new Thread() | |
{ | |
public void run() | |
{ | |
sharedQueue.enqueue(12345); | |
} | |
}; | |
Thread removeThread = new Thread() | |
{ | |
public void run() | |
{ | |
while (sharedQueue.getElementsInQueue() <= 0) | |
; | |
int element = sharedQueue.dequeue(); | |
System.out.println("retrieved " + element); | |
} | |
}; | |
insertThread.start(); | |
try | |
{ | |
Thread.sleep(50); | |
} | |
catch (InterruptedException ie) | |
{ | |
} | |
removeThread.start(); | |
} | |
} |
You will notice that the thread continuously checks whether there is at least 1 or more elements in the queue, and when there is, it calls dequeue() method. The question is: can we guarantee that element in the queue will remain in between the two calls? That is, what if another thread calls dequeue() in between, intercepting the element first? This is certainly possible...
Solution? I'd say we fix this issue by having dequeue() method check for the number of elements in the queue. Since dequeue() method is synchronized, it is guaranteed that if there is an element, it will be able to retrieve it for sure. What if there is no element in the queue? It will return immediately with an exception stating that there is no element to retrieve. Thus, we need to modify removeThread run() method to loop dequeue() method continuously until it does not throw an exception. Take a look at code below for implementation.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class SharedQueueSafe | |
{ | |
private int[] queue; | |
private int head, tail, num_elems; | |
private int size; | |
class NoElementException extends Exception | |
{ | |
public NoElementException(String message) | |
{ | |
super(message); | |
} | |
} | |
public SharedQueueSafe(int size) | |
{ | |
this.size = size; | |
queue = new int[size]; | |
for (int i=0; i<size; i++) | |
queue[i] = -1; | |
head = tail = num_elems = 0; | |
} | |
public synchronized void enqueue(int data) | |
{ | |
num_elems++; | |
System.out.println("inserting " + data + " into the queue"); | |
try | |
{ | |
Thread.sleep(100); | |
} | |
catch (InterruptedException ie) | |
{ | |
} | |
queue[head] = data; | |
head = (head + 1) % size; | |
} | |
public synchronized int dequeue() throws NoElementException | |
{ | |
if (num_elems <= 0) | |
throw new NoElementException("queue is empty"); | |
num_elems--; | |
int ret = queue[tail]; | |
tail = (tail + 1) % size; | |
return ret; | |
} | |
static public void main(String[] args) | |
{ | |
SharedQueueSafe sharedQueue = new SharedQueueSafe(10); | |
Thread insertThread = new Thread() | |
{ | |
public void run() | |
{ | |
sharedQueue.enqueue(12345); | |
} | |
}; | |
Thread removeThread = new Thread() | |
{ | |
public void run() | |
{ | |
int element; | |
while (true) | |
{ | |
try | |
{ | |
element = sharedQueue.dequeue(); | |
break; | |
} | |
catch (NoElementException e) | |
{ | |
} | |
} | |
System.out.println("retrieved " + element); | |
} | |
}; | |
removeThread.start(); | |
try | |
{ | |
Thread.sleep(50); | |
} | |
catch (InterruptedException ie) | |
{ | |
} | |
insertThread.start(); | |
} | |
} |
The modified code above certainly is better than the previous version, but we are not done yet. There are still a few more fixes to be made, and we will tackle one by one in the subsequent posts!
No comments:
Post a Comment