(Post 07/12/2007) A simple Wait/Pulse application
is a producer-consumer queue – the structure we wrote earlier using an
AutoResetEvent. A producer enqueues tasks (typically on the main thread),
while one or more consumers running on worker threads pick off and execute
the tasks one by one.
Producer/Consumer Queue
A simple Wait/Pulse application is a producer-consumer
queue – the structure we wrote earlier using an AutoResetEvent. A producer
enqueues tasks (typically on the main thread), while one or more consumers
running on worker threads pick off and execute the tasks one by one.
In this example, we'll use a string to represent a task.
Our task queue then looks like this:
Queue<string> taskQ =
new Queue<string>();
Because the queue will be used on multiple threads, we
must wrap all statements that read or write to the queue in a lock. Here's
how we enqueue a task:
lock (locker) {
taskQ.Enqueue ("my task");
Monitor.PulseAll (locker); // We're altering a blocking condition
}
Because we're modifying a potential blocking condition,
we must pulse. We call PulseAll rather than Pulse because we're going
to allow for multiple consumers. More than one thread may be waiting.
We want the workers to block while there's nothing to
do, in other words, when there are no items on the queue. Hence our blocking
condition is taskQ.Count==0. Here's a Wait statement that performs exactly
this:
lock (locker)
while (taskQ.Count == 0) Monitor.Wait (locker);
The next step is for the worker to dequeue the task and execute it:
lock (locker)
while (taskQ.Count == 0) Monitor.Wait (locker);
string task;
lock (locker)
task = taskQ.Dequeue();
This logic, however, is not thread-safe: we've basing
a decision to dequeue upon stale information – obtained in a prior lock
statement. Consider what would happen if we started two consumer threads
concurrently, with a single item already on the queue. It's possible that
neither thread would enter the while loop to block – both seeing a single
item on the queue. They'd both then attempt to dequeue the same item,
throwing an exception in the second instance! To fix this, we simply hold
the lock a bit longer – until we've finished interacting with the queue:
string task;
lock (locker) {
while (taskQ.Count == 0) Monitor.Wait (locker);
task = taskQ.Dequeue();
}
(We don't need to call Pulse after dequeuing, as no consumer
can ever unblock by there being fewer items on the queue).
Once the task is dequeued, there's no further requirement
to keep the lock. Releasing it at this point allows the consumer to perform
a possibly time-consuming task without unnecessary blocking other threads.
Here's the complete program. As with the AutoResetEvent
version, we enqueue a null task to signal a consumer to exit (after finishing
any outstanding tasks). Because we're supporting multiple consumers, we
must enqueue one null task per consumer to completely shut down the queue:
Wait/Pulse Boilerplate #2: Producer/Consumer Queue
using System;
using System.Threading;
using System.Collections.Generic;
public class TaskQueue : IDisposable
{
object locker = new object();
Thread[] workers;
Queue<string> taskQ = new Queue<string>();
public TaskQueue (int workerCount)
{
workers = new Thread [workerCount];
// Create and start a separate
thread for each worker
for (int i = 0; i < workerCount; i++)
(workers [i] = new Thread (Consume)).Start();
}
public void Dispose() {
// Enqueue one null task per worker to make each exit.
foreach (Thread worker in workers) EnqueueTask (null);
foreach (Thread worker in workers) worker.Join();
}
public void EnqueueTask (string
task) {
lock (locker) {
taskQ.Enqueue (task);
Monitor.PulseAll (locker);
}
}
void Consume() {
while (true) {
string task;
lock (locker) {
while (taskQ.Count == 0) Monitor.Wait (locker);
task = taskQ.Dequeue();
}
if (task == null) return; // This signals our exit
Console.Write (task);
Thread.Sleep (1000); // Simulate time-consuming task
}
}
}
Here's a Main method that starts a task queue, specifying
two concurrent consumer threads, and then enqueues ten tasks to be shared
amongst the two consumers:
static void Main() {
using (TaskQueue q = new TaskQueue (2)) {
for (int i = 0; i < 10; i++)
q.EnqueueTask (" Task" + i);
Console.WriteLine ("Enqueued 10 tasks");
Console.WriteLine ("Waiting for tasks to complete...");
}
// Exiting the using statement runs TaskQueue's Dispose method, which
// shuts down the consumers, after all outstanding tasks are completed.
Console.WriteLine ("\r\nAll tasks done!");
}
Enqueued 10 tasks
Waiting for tasks to complete...
Task1 Task0 (pause...) Task2 Task3 (pause...) Task4 Task5 (pause...)
Task6 Task7 (pause...) Task8 Task9 (pause...)
All tasks done! |
Consistent with our design pattern, if we remove PulseAll
and replace Wait with lock toggling, we'll get the same output.
Pulse Economy
Let's revisit the producer enqueuing a task:
lock (locker) {
taskQ.Enqueue (task);
Monitor.PulseAll (locker);
}
Strictly speaking, we could economize by pulsing only
when there's a possibility of a freeing a blocked worker:
lock (locker) {
taskQ.Enqueue (task);
if (taskQ.Count <= workers.Length) Monitor.PulseAll (locker);
}
We'd be saving very little, though, since pulsing typically
takes under a microsecond, and incurs no overhead on busy workers – since
they ignore it anyway! It's a good policy with multi-threaded code to
cull any unnecessary logic: an intermittent bug due to a silly mistake
is a heavy price to pay for a one-microsecond saving! To demonstrate,
this is all it would take to introduce an intermittent "stuck worker"
bug that would most likely evade initial testing (spot the difference):
lock (locker) {
taskQ.Enqueue (task);
if (taskQ.Count < workers.Length) Monitor.PulseAll (locker);
}
Pulsing unconditionally protects us from this type of
bug.
If in doubt, Pulse. Rarely can you go wrong
by pulsing, within this design pattern. |
Pulse or PulseAll?
This example comes with further pulse economy potential.
After enqueuing a task, we could call Pulse instead of PulseAll and nothing
would break.
Let's recap the difference: with Pulse, a maximum of
one thread can awake (and re-check its while-loop blocking condition);
with PulseAll, all waiting threads will awake (and re-check their blocking
conditions). If we're enqueing a single task, only one worker can handle
it, so we need only wake up one worker with a single Pulse. It's rather
like having a class of sleeping children – if there's just one ice-cream
there's no point in waking them all to queue for it!
In our example we start only two consumer threads, so
we would have little to gain. But if we started ten consumers, we might
benefit slightly in choosing Pulse over PulseAll. It would mean, though,
that if we enqueued multiple tasks, we would need to Pulse multiple times.
This can be done within a single lock statement, as follows:
lock (locker) {
taskQ.Enqueue ("task 1");
taskQ.Enqueue ("task 2");
Monitor.Pulse (locker); // "Signal up to two
Monitor.Pulse (locker); // waiting threads."
}
The price of one Pulse too few is a stuck worker. This
will usually manifest as an intermittent bug, because it will crop up
only when a consumer is in a Waiting state. Hence one could extend the
previous maxim "if in doubt, Pulse", to "if in doubt, PulseAll!"
A possible exception to the rule might arise if evaluating
the blocking condition was unusually time-consuming.
Using Wait Timeouts
Sometimes it may be unreasonable or impossible to Pulse
whenever an unblocking condition arises. An example might be if a blocking
condition involves calling a method that derives information from periodically
querying a database. If latency is not an issue, the solution is simple:
one can specify a timeout when calling Wait, as follows:
lock (locker) {
while ( blocking condition )
Monitor.Wait (locker, timeout);
This forces the blocking condition to be re-checked,
at a minimum, at a regular interval specified by the timeout, as well
as immediately upon receiving a pulse. The simpler the blocking condition,
the smaller the timeout can be without causing inefficiency.
The same system works equally well if the pulse is absent
due to a bug in the program! It can be worth adding a timeout to all Wait
commands in programs where synchronization is particularly complex – as
an ultimate backup for obscure pulsing errors. It also provides a degree
of bug-immunity if the program is modified later by someone not on the
Pulse!
Races and Acknowledgement
Let's say we want a signal a worker five times in a row:
class Race {
static object locker = new object();
static bool go;
static void Main() {
new Thread (SaySomething).Start();
for (int i = 0; i < 5; i++) {
lock (locker) { go = true; Monitor.Pulse (locker); }
}
}
static void SaySomething() {
for (int i = 0; i < 5; i++) {
lock (locker) {
while (!go) Monitor.Wait (locker);
go = false;
}
Console.WriteLine ("Wassup?");
}
}
}
Expected Output:
Wassup?
Wassup?
Wassup?
Wassup?
Wassup? |
Actual Output:
This program is flawed: the for loop in the main thread
can free-wheel right through its five iterations any time the worker doesn't
hold the lock. Possibly before the worker even starts! The Producer/Consumer
example didn't suffer from this problem because if the main thread got
ahead of the worker, each request would simply queue up. But in this case,
we need the main thread to block at each iteration if the worker's still
busy with a previous task.
A simple solution is for the main thread to wait after
each cycle until the go flag is cleared by the worker. This, then, requires
that the worker call Pulse after clearing the go flag:
class Acknowledged {
static object locker = new object();
static bool go;
static void Main() {
new Thread (SaySomething).Start();
for (int i = 0; i < 5; i++) {
lock (locker) { go = true; Monitor.Pulse (locker); }
lock (locker) { while (go) Monitor.Wait (locker); }
}
}
static void SaySomething() {
for (int i = 0; i < 5; i++) {
lock (locker) {
while (!go) Monitor.Wait (locker);
go = false; Monitor.Pulse (locker); // Worker must Pulse
}
Console.WriteLine ("Wassup?");
}
}
}
Wassup? (repeated five times) |
An important feature of such a program is that the worker
releases its lock before performing its potentially time-consuming job
(this would happen in place of where we're calling Console.WriteLine).
This ensures the instigator is not unduly blocked while the worker performs
the task for which it has been signaled (and is blocked only if the worker
is busy with a previous task).
In this example, only one thread (the main thread) signals
the worker to perform a task. If multiple threads were to signal the worker
– using our Main method's logic – we would come unstuck. Two signaling
threads could each execute the following line of code in sequence:
lock (locker) { go = true;
Monitor.Pulse (locker); }
resulting in the second signal being lost if the worker
didn't happen to have finish processing the first. We can make our design
robust in this scenario by using a pair of flags – a "ready"
flag as well as a "go" flag. The "ready" flag indicates
that the worker is able to accept a fresh task; the "go" flag
is an instruction to proceed, as before. This is analogous to a previous
example that performed the same thing using two AutoResetEvents, except
more extensible. Here's the pattern, refactored with instance fields:
Wait/Pulse Boilerplate #3: Two-way Signaling
public class Acknowledged {
object locker = new object();
bool ready;
bool go;
public void NotifyWhenReady()
{
lock (locker) {
// Wait if the worker's already busy with a previous job
while (!ready) Monitor.Wait (locker);
ready = false;
go = true;
Monitor.PulseAll (locker);
}
}
public void AcknowledgedWait()
{
// Indicate that we're ready to process a request
lock (locker) { ready = true; Monitor.Pulse (locker); }
lock (locker) {
while (!go) Monitor.Wait (locker); // Wait for a "go" signal
go = false; Monitor.PulseAll (locker); // Acknowledge signal
}
Console.WriteLine ("Wassup?"); // Perform task
}
}
To demonstrate, we'll start two concurrent threads, each
that will notify the worker five times. Meanwhile, the main thread will
wait for ten notifications:
public class Test {
static Acknowledged a = new Acknowledged();
static void Main() {
new Thread (Notify5).Start(); // Run two concurrent
new Thread (Notify5).Start(); // notifiers...
Wait10(); // ... and one waiter.
}
static void Notify5() {
for (int i = 0; i < 5; i++)
a.NotifyWhenReady();
}
static void Wait10() {
for (int i = 0; i < 10; i++)
a.AcknowledgedWait();
}
}
Wassup?
Wassup?
Wassup?
(repeated ten times) |
In the Notify method, the ready flag is cleared before
exiting the lock statement. This is vitally important: it prevents two
notifiers signaling sequentially without re-checking the flag. For the
sake of simplicity, we also set the go flag and call PulseAll in the same
lock statement – although we could just as well put this pair of statements
in a separate lock and nothing would break.
Simulating Wait Handles
You might have noticed a pattern in the previous example:
both waiting loops have the following structure:
lock (locker) {
while (!flag) Monitor.Wait (locker);
flag = false;
...
}
where flag is set to true in another thread. This is,
in effect, mimicking an AutoResetEvent. If we omitted flag=false, we'd
then have a ManualResetEvent. Using an integer field, Pulse and Wait can
also be used to mimic a Semaphore. In fact the only Wait Handle we can't
mimic with Pulse and Wait is a Mutex, since this functionality is provided
by the lock statement.
Simulating the static methods that work across multiple
Wait Handles is in most cases easy. The equivalent of calling WaitAll
across multiple EventWaitHandles is nothing more than a blocking condition
that incorporates all the flags used in place of the Wait Handles:
lock (locker) {
while (!flag1 && !flag2 && !flag3...) Monitor.Wait (locker);
This can be particularly useful given that WaitAll is
in most cases unusable due to COM legacy issues. Simulating WaitAny is
simply a matter of replacing the && operator with the || operator.
SignalAndWait is trickier. Recall that this method signals
one handle while waiting on another in an atomic operation. We have a
situation analogous to a distributed database transaction – we need a
two-phase commit! Assuming we wanted to signal flagA while waiting on
flagB, we'd have to divide each flag into two, resulting in code that
might look something like this:
lock (locker) {
flagAphase1 = true;
Monitor.Pulse (locker);
while (!flagBphase1) Monitor.Wait (locker);
flagAphase2 = true;
Monitor.Pulse (locker);
while (!flagBphase2) Monitor.Wait (locker);
}
perhaps with additional "rollback" logic to
retract flagAphase1 if the first Wait statement threw an exception as
a result of being interrupted or aborted. This is one situation where
Wait Handles are way easier! True atomic signal-and-waiting, however,
is actually an unusual requirement.
Wait Rendezvous
Just as WaitHandle.SignalAndWait can be used to rendezvous
a pair of threads, so can Wait and Pulse. In the following example, one
could say we simulate two ManualResetEvents (in other words, we define
two boolean flags!) and then perform reciprocal signal-and-waiting by
setting one flag while waiting for the other. In this case we don't need
true atomicity in signal-and-waiting, so can avoid the need for a "two-phase
commit". As long as we set our flag true and Wait in the same lock
statement, the rendezvous will work:
class Rendezvous {
static object locker = new object();
static bool signal1, signal2;
static void Main() {
// Get each thread to sleep a random amount of time.
Random r = new Random();
new Thread (Mate).Start (r.Next (10000));
Thread.Sleep (r.Next (10000));
lock (locker) {
signal1 = true;
Monitor.Pulse (locker);
while (!signal2) Monitor.Wait (locker);
}
Console.Write ("Mate! ");
}
// This is called via a ParameterizedThreadStart
static void Mate (object delay) {
Thread.Sleep ((int) delay);
lock (locker) {
signal2 = true;
Monitor.Pulse (locker);
while (!signal1) Monitor.Wait (locker);
}
Console.Write ("Mate! ");
}
}
Mate! Mate! (almost in unison) |
Wait and Pulse vs. Wait Handles
Because Wait and Pulse are the most flexible of the synchronization
constructs, they can be used in almost any situation. Wait Handles, however,
have two advantages:
- they have the capability of working across multiple processes
- they are simpler to understand, and harder to break
Additionally, Wait Handles are more interoperable in
the sense that they can be passed around via method arguments. In thread
pooling, this technique is usefully employed.
In terms of performance, Wait and Pulse have a slight
edge, if one follows the suggested design pattern for waiting, that is:
lock (locker)
while ( blocking condition ) Monitor.Wait (locker);
and the blocking condition happens to false from the
outset. The only overhead then incurred is that of taking out the lock
(tens of nanoseconds) versus the few microseconds it would take to call
WaitHandle.WaitOne. Of course, this assumes the lock is uncontended; even
the briefest lock contention would be more than enough to even things
out; frequent lock contention would make a Wait Handle faster!
Given the potential
for variation through different CPUs, operating systems, CLR versions,
and program logic; and that in any case a few microseconds is
unlikely to be of any consequence before a Wait statement, performance
may be a dubious reason to choose Wait and Pulse over Wait Handles,
or vice versa. |
A sensible guideline is to use a Wait Handle where a
particular construct lends itself naturally to the job, otherwise use
Wait and Pulse.
(Sưu tầm) |