As an example that brings together many of the concepts in this chapter, consider a hypothetical robotic assembly line for automobiles. Each Car will be built in several stages, and in this example we’ll look at a single stage: after the chassis has been created, at the time when the engine, drive train, and wheels are attached. The Cars are transported from one place to another via a CarQueue, which is a type of TQueue. A Director takes each Car (as a raw chassis) from the incoming CarQueue and places it in a Cradle, which is where all the work is done. At this point, the Director tells all the waiting robots (using broadcast( )) that the Car is in the Cradle ready for the robots to work on it. The three types of robots go to work, sending a message to the Cradle when they finish their tasks. The Director waits until all the tasks are complete and then puts the Car onto the outgoing CarQueue to be transported to the next operation. In this case, the consumer of the outgoing CarQueue is a Reporter object, which just prints the Car to show that the tasks have been properly completed.
//: C11:CarBuilder.cpp
// How broadcast() works.
//{L} ZThread
#include "zthread/Thread.h"
#include "zthread/Mutex.h"
#include "zthread/Guard.h"
#include "zthread/Condition.h"
#include "zthread/ThreadedExecutor.h"
#include "TQueue.h"
#include
#include
using namespace ZThread;
using namespace std;
class Car {
bool engine, driveTrain, wheels;
int id;
public:
Car(int idn) : id(idn), engine(false),
driveTrain(false), wheels(false) {}
// Empty Car object:
Car() : id(-1), engine(false),
driveTrain(false), wheels(false) {}
// Unsynchronized -- assumes atomic bool operations:
int getId() { return id; }
void addEngine() { engine = true; }
bool engineInstalled() { return engine; }
void addDriveTrain() { driveTrain = true; }
bool driveTrainInstalled() { return driveTrain; }
void addWheels() { wheels = true; }
bool wheelsInstalled() { return wheels; }
friend ostream& operator<<(ostream& os, const Car& c) {
return os << "Car " << c.id << " ["
<< " engine: " << c.engine
<< " driveTrain: " << c.driveTrain
<< " wheels: " << c.wheels << " ]";
}
};
typedef CountedPtr< TQueue
class ChassisBuilder : public Runnable {
CarQueue carQueue;
int counter;
public:
ChassisBuilder(CarQueue& cq) : carQueue(cq), counter(0){}
void run() {
try {
while(!Thread::interrupted()) {
Thread::sleep(1000);
// Make chassis:
Car c(counter++);
cout << c << endl;
// Insert into queue
carQueue->put(c);
}
} catch(Interrupted_Exception&) { /* Exit */ }
cout << "ChassisBuilder off" << endl;
}
};
class Cradle {
Car c; // Holds current car being worked on
bool occupied;
Mutex workLock, readyLock;
Condition workCondition, readyCondition;
bool engineBotHired, wheelBotHired, driveTrainBotHired;
public:
Cradle()
: workCondition(workLock), readyCondition(readyLock) {
occupied = false;
engineBotHired = true;
wheelBotHired = true;
driveTrainBotHired = true;
}
void insertCar(Car chassis) {
c = chassis;
occupied = true;
}
Car getCar() { // Can only extract car once
if(!occupied) {
cerr << "No Car in Cradle for getCar()" << endl;
return Car(); // "Null" Car object
}
occupied = false;
return c;
}
// Access car while in cradle:
Car* operator->() { return &c }
// Allow robots to offer services to this cradle:
void offerEngineBotServices() {
Guard
while(engineBotHired)
workCondition.wait();
engineBotHired = true; // Accept the job
}
void offerWheelBotServices() {
Guard
while(wheelBotHired)
workCondition.wait();
wheelBotHired = true; // Accept the job
}
void offerDriveTrainBotServices() {
Guard
while(driveTrainBotHired)
workCondition.wait();
driveTrainBotHired = true; // Accept the job
}
// Tell waiting robots that work is ready:
void startWork() {
Guard
engineBotHired = false;
wheelBotHired = false;
driveTrainBotHired = false;
workCondition.broadcast();
}
// Each robot reports when their job is done:
void taskFinished() {
Guard
readyCondition.signal();
}
// Director waits until all jobs are done:
void waitUntilWorkFinished() {
Guard
while(!(c.engineInstalled() && c.driveTrainInstalled()
&& c.wheelsInstalled()))
readyCondition.wait();
}
};
typedef CountedPtr
class Director : public Runnable {
CarQueue chassisQueue, finishingQueue;
CradlePtr cradle;
public:
Director(CarQueue& cq, CarQueue& fq, CradlePtr cr)
: chassisQueue(cq), finishingQueue(fq), cradle(cr) {}
void run() {
try {
while(!Thread::interrupted()) {
// Blocks until chassis is available:
cradle->insertCar(chassisQueue->get());
// Notify robots car is ready for work
cradle->startWork();
// Wait until work completes
cradle->waitUntilWorkFinished();
// Put car into queue for further work
finishingQueue->put(cradle->getCar());
}
} catch(Interrupted_Exception&) { /* Exit */ }
cout << "Director off" << endl;
}
};
class EngineRobot : public Runnable {
CradlePtr cradle;
public:
EngineRobot(CradlePtr cr) : cradle(cr) {}