Thread in C# - Part 17: Wait and Pulse (2)  
 

(Post 07/12/2007) A simple Wait/Pulse application is a producer-consumer queue – the structure we wrote earlier using an AutoResetEvent. A producer enqueues tasks (typically on the main thread), while one or more consumers running on worker threads pick off and execute the tasks one by one.

Producer/Consumer Queue

A simple Wait/Pulse application is a producer-consumer queue – the structure we wrote earlier using an AutoResetEvent. A producer enqueues tasks (typically on the main thread), while one or more consumers running on worker threads pick off and execute the tasks one by one.

In this example, we'll use a string to represent a task. Our task queue then looks like this:

Queue<string> taskQ = new Queue<string>();

Because the queue will be used on multiple threads, we must wrap all statements that read or write to the queue in a lock. Here's how we enqueue a task:

lock (locker) {
taskQ.Enqueue ("my task");
Monitor.PulseAll (locker); // We're altering a blocking condition
}

Because we're modifying a potential blocking condition, we must pulse. We call PulseAll rather than Pulse because we're going to allow for multiple consumers. More than one thread may be waiting.

We want the workers to block while there's nothing to do, in other words, when there are no items on the queue. Hence our blocking condition is taskQ.Count==0. Here's a Wait statement that performs exactly this:

lock (locker)
while (taskQ.Count == 0) Monitor.Wait (locker);
The next step is for the worker to dequeue the task and execute it:
lock (locker)
while (taskQ.Count == 0) Monitor.Wait (locker);

string task;
lock (locker)
task = taskQ.Dequeue();

This logic, however, is not thread-safe: we've basing a decision to dequeue upon stale information – obtained in a prior lock statement. Consider what would happen if we started two consumer threads concurrently, with a single item already on the queue. It's possible that neither thread would enter the while loop to block – both seeing a single item on the queue. They'd both then attempt to dequeue the same item, throwing an exception in the second instance! To fix this, we simply hold the lock a bit longer – until we've finished interacting with the queue:

string task;
lock (locker) {
while (taskQ.Count == 0) Monitor.Wait (locker);
task = taskQ.Dequeue();
}

(We don't need to call Pulse after dequeuing, as no consumer can ever unblock by there being fewer items on the queue).

Once the task is dequeued, there's no further requirement to keep the lock. Releasing it at this point allows the consumer to perform a possibly time-consuming task without unnecessary blocking other threads.

Here's the complete program. As with the AutoResetEvent version, we enqueue a null task to signal a consumer to exit (after finishing any outstanding tasks). Because we're supporting multiple consumers, we must enqueue one null task per consumer to completely shut down the queue:

Wait/Pulse Boilerplate #2: Producer/Consumer Queue

using System;
using System.Threading;
using System.Collections.Generic;

public class TaskQueue : IDisposable {
object locker = new object();
Thread[] workers;
Queue<string> taskQ = new Queue<string>();

public TaskQueue (int workerCount) {
workers = new Thread [workerCount];

// Create and start a separate thread for each worker
for (int i = 0; i < workerCount; i++)
(workers [i] = new Thread (Consume)).Start();
}

public void Dispose() {
// Enqueue one null task per worker to make each exit.
foreach (Thread worker in workers) EnqueueTask (null);
foreach (Thread worker in workers) worker.Join();
}

public void EnqueueTask (string task) {
lock (locker) {
taskQ.Enqueue (task);
Monitor.PulseAll (locker);
}
}

void Consume() {
while (true) {
string task;
lock (locker) {
while (taskQ.Count == 0) Monitor.Wait (locker);
task = taskQ.Dequeue();
}
if (task == null) return; // This signals our exit
Console.Write (task);
Thread.Sleep (1000); // Simulate time-consuming task
}
}
}

Here's a Main method that starts a task queue, specifying two concurrent consumer threads, and then enqueues ten tasks to be shared amongst the two consumers:

static void Main() {
using (TaskQueue q = new TaskQueue (2)) {
for (int i = 0; i < 10; i++)
q.EnqueueTask (" Task" + i);

Console.WriteLine ("Enqueued 10 tasks");
Console.WriteLine ("Waiting for tasks to complete...");
}
// Exiting the using statement runs TaskQueue's Dispose method, which
// shuts down the consumers, after all outstanding tasks are completed.
Console.WriteLine ("\r\nAll tasks done!");
}

 

Enqueued 10 tasks
Waiting for tasks to complete...
Task1 Task0 (pause...) Task2 Task3 (pause...) Task4 Task5 (pause...)
Task6 Task7 (pause...) Task8 Task9 (pause...)
All tasks done!

Consistent with our design pattern, if we remove PulseAll and replace Wait with lock toggling, we'll get the same output.

Pulse Economy

Let's revisit the producer enqueuing a task:

lock (locker) {
taskQ.Enqueue (task);
Monitor.PulseAll (locker);
}

Strictly speaking, we could economize by pulsing only when there's a possibility of a freeing a blocked worker:

lock (locker) {
taskQ.Enqueue (task);
if (taskQ.Count <= workers.Length) Monitor.PulseAll (locker);
}

We'd be saving very little, though, since pulsing typically takes under a microsecond, and incurs no overhead on busy workers – since they ignore it anyway! It's a good policy with multi-threaded code to cull any unnecessary logic: an intermittent bug due to a silly mistake is a heavy price to pay for a one-microsecond saving! To demonstrate, this is all it would take to introduce an intermittent "stuck worker" bug that would most likely evade initial testing (spot the difference):

lock (locker) {
taskQ.Enqueue (task);
if (taskQ.Count < workers.Length) Monitor.PulseAll (locker);
}

Pulsing unconditionally protects us from this type of bug.

If in doubt, Pulse. Rarely can you go wrong by pulsing, within this design pattern.

Pulse or PulseAll?

This example comes with further pulse economy potential. After enqueuing a task, we could call Pulse instead of PulseAll and nothing would break.

Let's recap the difference: with Pulse, a maximum of one thread can awake (and re-check its while-loop blocking condition); with PulseAll, all waiting threads will awake (and re-check their blocking conditions). If we're enqueing a single task, only one worker can handle it, so we need only wake up one worker with a single Pulse. It's rather like having a class of sleeping children – if there's just one ice-cream there's no point in waking them all to queue for it!

In our example we start only two consumer threads, so we would have little to gain. But if we started ten consumers, we might benefit slightly in choosing Pulse over PulseAll. It would mean, though, that if we enqueued multiple tasks, we would need to Pulse multiple times. This can be done within a single lock statement, as follows:

lock (locker) {
taskQ.Enqueue ("task 1");
taskQ.Enqueue ("task 2");
Monitor.Pulse (locker); // "Signal up to two
Monitor.Pulse (locker); // waiting threads."
}

The price of one Pulse too few is a stuck worker. This will usually manifest as an intermittent bug, because it will crop up only when a consumer is in a Waiting state. Hence one could extend the previous maxim "if in doubt, Pulse", to "if in doubt, PulseAll!"

A possible exception to the rule might arise if evaluating the blocking condition was unusually time-consuming.

Using Wait Timeouts

Sometimes it may be unreasonable or impossible to Pulse whenever an unblocking condition arises. An example might be if a blocking condition involves calling a method that derives information from periodically querying a database. If latency is not an issue, the solution is simple: one can specify a timeout when calling Wait, as follows:

lock (locker) {
while ( blocking condition )
Monitor.Wait (locker, timeout);

This forces the blocking condition to be re-checked, at a minimum, at a regular interval specified by the timeout, as well as immediately upon receiving a pulse. The simpler the blocking condition, the smaller the timeout can be without causing inefficiency.

The same system works equally well if the pulse is absent due to a bug in the program! It can be worth adding a timeout to all Wait commands in programs where synchronization is particularly complex – as an ultimate backup for obscure pulsing errors. It also provides a degree of bug-immunity if the program is modified later by someone not on the Pulse!

Races and Acknowledgement

Let's say we want a signal a worker five times in a row:

class Race {
static object locker = new object();
static bool go;

static void Main() {
new Thread (SaySomething).Start();

for (int i = 0; i < 5; i++) {
lock (locker) { go = true; Monitor.Pulse (locker); }
}
}

static void SaySomething() {
for (int i = 0; i < 5; i++) {
lock (locker) {
while (!go) Monitor.Wait (locker);
go = false;
}
Console.WriteLine ("Wassup?");
}
}
}


Expected Output:

 

Wassup?
Wassup?
Wassup?
Wassup?
Wassup?

Actual Output:

 

Wassup?
(hangs)

This program is flawed: the for loop in the main thread can free-wheel right through its five iterations any time the worker doesn't hold the lock. Possibly before the worker even starts! The Producer/Consumer example didn't suffer from this problem because if the main thread got ahead of the worker, each request would simply queue up. But in this case, we need the main thread to block at each iteration if the worker's still busy with a previous task.

A simple solution is for the main thread to wait after each cycle until the go flag is cleared by the worker. This, then, requires that the worker call Pulse after clearing the go flag:

class Acknowledged {
static object locker = new object();
static bool go;

static void Main() {
new Thread (SaySomething).Start();

for (int i = 0; i < 5; i++) {
lock (locker) { go = true; Monitor.Pulse (locker); }
lock (locker) { while (go) Monitor.Wait (locker); }
}
}

static void SaySomething() {
for (int i = 0; i < 5; i++) {
lock (locker) {
while (!go) Monitor.Wait (locker);
go = false; Monitor.Pulse (locker); // Worker must Pulse
}
Console.WriteLine ("Wassup?");
}
}
}

 

Wassup? (repeated five times)

An important feature of such a program is that the worker releases its lock before performing its potentially time-consuming job (this would happen in place of where we're calling Console.WriteLine). This ensures the instigator is not unduly blocked while the worker performs the task for which it has been signaled (and is blocked only if the worker is busy with a previous task).

In this example, only one thread (the main thread) signals the worker to perform a task. If multiple threads were to signal the worker – using our Main method's logic – we would come unstuck. Two signaling threads could each execute the following line of code in sequence:

lock (locker) { go = true; Monitor.Pulse (locker); }

resulting in the second signal being lost if the worker didn't happen to have finish processing the first. We can make our design robust in this scenario by using a pair of flags – a "ready" flag as well as a "go" flag. The "ready" flag indicates that the worker is able to accept a fresh task; the "go" flag is an instruction to proceed, as before. This is analogous to a previous example that performed the same thing using two AutoResetEvents, except more extensible. Here's the pattern, refactored with instance fields:

Wait/Pulse Boilerplate #3: Two-way Signaling

public class Acknowledged {
object locker = new object();
bool ready;
bool go;

public void NotifyWhenReady() {
lock (locker) {
// Wait if the worker's already busy with a previous job
while (!ready) Monitor.Wait (locker);
ready = false;
go = true;
Monitor.PulseAll (locker);
}
}

public void AcknowledgedWait() {
// Indicate that we're ready to process a request
lock (locker) { ready = true; Monitor.Pulse (locker); }

lock (locker) {
while (!go) Monitor.Wait (locker); // Wait for a "go" signal
go = false; Monitor.PulseAll (locker); // Acknowledge signal
}

Console.WriteLine ("Wassup?"); // Perform task
}
}

To demonstrate, we'll start two concurrent threads, each that will notify the worker five times. Meanwhile, the main thread will wait for ten notifications:

public class Test {
static Acknowledged a = new Acknowledged();

static void Main() {
new Thread (Notify5).Start(); // Run two concurrent
new Thread (Notify5).Start(); // notifiers...
Wait10(); // ... and one waiter.
}

static void Notify5() {
for (int i = 0; i < 5; i++)
a.NotifyWhenReady();
}

static void Wait10() {
for (int i = 0; i < 10; i++)
a.AcknowledgedWait();
}
}

 

Wassup?
Wassup?
Wassup?
(repeated ten times)

In the Notify method, the ready flag is cleared before exiting the lock statement. This is vitally important: it prevents two notifiers signaling sequentially without re-checking the flag. For the sake of simplicity, we also set the go flag and call PulseAll in the same lock statement – although we could just as well put this pair of statements in a separate lock and nothing would break.

Simulating Wait Handles

You might have noticed a pattern in the previous example: both waiting loops have the following structure:

lock (locker) {
while (!flag) Monitor.Wait (locker);
flag = false;
...
}

where flag is set to true in another thread. This is, in effect, mimicking an AutoResetEvent. If we omitted flag=false, we'd then have a ManualResetEvent. Using an integer field, Pulse and Wait can also be used to mimic a Semaphore. In fact the only Wait Handle we can't mimic with Pulse and Wait is a Mutex, since this functionality is provided by the lock statement.

Simulating the static methods that work across multiple Wait Handles is in most cases easy. The equivalent of calling WaitAll across multiple EventWaitHandles is nothing more than a blocking condition that incorporates all the flags used in place of the Wait Handles:

lock (locker) {
while (!flag1 && !flag2 && !flag3...) Monitor.Wait (locker);

This can be particularly useful given that WaitAll is in most cases unusable due to COM legacy issues. Simulating WaitAny is simply a matter of replacing the && operator with the || operator.

SignalAndWait is trickier. Recall that this method signals one handle while waiting on another in an atomic operation. We have a situation analogous to a distributed database transaction – we need a two-phase commit! Assuming we wanted to signal flagA while waiting on flagB, we'd have to divide each flag into two, resulting in code that might look something like this:

lock (locker) {
flagAphase1 = true;
Monitor.Pulse (locker);
while (!flagBphase1) Monitor.Wait (locker);

flagAphase2 = true;
Monitor.Pulse (locker);
while (!flagBphase2) Monitor.Wait (locker);
}

perhaps with additional "rollback" logic to retract flagAphase1 if the first Wait statement threw an exception as a result of being interrupted or aborted. This is one situation where Wait Handles are way easier! True atomic signal-and-waiting, however, is actually an unusual requirement.

Wait Rendezvous

Just as WaitHandle.SignalAndWait can be used to rendezvous a pair of threads, so can Wait and Pulse. In the following example, one could say we simulate two ManualResetEvents (in other words, we define two boolean flags!) and then perform reciprocal signal-and-waiting by setting one flag while waiting for the other. In this case we don't need true atomicity in signal-and-waiting, so can avoid the need for a "two-phase commit". As long as we set our flag true and Wait in the same lock statement, the rendezvous will work:

class Rendezvous {
static object locker = new object();
static bool signal1, signal2;

static void Main() {
// Get each thread to sleep a random amount of time.
Random r = new Random();
new Thread (Mate).Start (r.Next (10000));
Thread.Sleep (r.Next (10000));

lock (locker) {
signal1 = true;
Monitor.Pulse (locker);
while (!signal2) Monitor.Wait (locker);
}
Console.Write ("Mate! ");
}

// This is called via a ParameterizedThreadStart
static void Mate (object delay) {
Thread.Sleep ((int) delay);
lock (locker) {
signal2 = true;
Monitor.Pulse (locker);
while (!signal1) Monitor.Wait (locker);
}
Console.Write ("Mate! ");
}
}

 

Mate! Mate! (almost in unison)

Wait and Pulse vs. Wait Handles

Because Wait and Pulse are the most flexible of the synchronization constructs, they can be used in almost any situation. Wait Handles, however, have two advantages:

  • they have the capability of working across multiple processes
  • they are simpler to understand, and harder to break

Additionally, Wait Handles are more interoperable in the sense that they can be passed around via method arguments. In thread pooling, this technique is usefully employed.

In terms of performance, Wait and Pulse have a slight edge, if one follows the suggested design pattern for waiting, that is:

lock (locker)
while ( blocking condition ) Monitor.Wait (locker);

and the blocking condition happens to false from the outset. The only overhead then incurred is that of taking out the lock (tens of nanoseconds) versus the few microseconds it would take to call WaitHandle.WaitOne. Of course, this assumes the lock is uncontended; even the briefest lock contention would be more than enough to even things out; frequent lock contention would make a Wait Handle faster!

Given the potential for variation through different CPUs, operating systems, CLR versions, and program logic; and that in any case a few microseconds is unlikely to be of any consequence before a Wait statement, performance may be a dubious reason to choose Wait and Pulse over Wait Handles, or vice versa.

A sensible guideline is to use a Wait Handle where a particular construct lends itself naturally to the job, otherwise use Wait and Pulse.

(Sưu tầm)


 
 

 
     
 
Công nghệ khác:


Thread in C# - Part 17: Wait and Pulse (1)Thread in C# - Part 16: Non-Blocking Synchronization
Thread in C# - Part 15: Local StorageThread in C# - Part 14: Timers
Thread in C# - Part 13: Asynchronous DelegatesThread in C# - Part 12: Thread Pooling
  Xem tiếp    
 
Lịch khai giảng của hệ thống
 
Ngày
Giờ
T.Tâm
TP Hồ Chí Minh
Hà Nội
 
   
New ADSE - Nhấn vào để xem chi tiết
Mừng Sinh Nhật Lần Thứ 20 FPT-APTECH
Nhấn vào để xem chi tiết
Bảng Vàng Thành Tích Sinh Viên FPT APTECH - Nhấn vào để xem chi tiết
Cập nhật công nghệ miễn phí cho tất cả cựu sinh viên APTECH toàn quốc
Tiết Thực Vì Cộng Đồng
Hội Thảo CNTT
Những khoảnh khắc không phai của Thầy Trò FPT-APTECH Ngày 20-11