© Harry Broeders.
In deze practicum opdracht maak je kennis met Pthreads door een bestaand multi-threaded programma te analyseren. We besteden daarbij vooral veel tijd aan het bestuderen van de real-time scheduling eigenschappen.
Het probleem dat we gaan analyseren is het zogenaamde producer-consumer synchronisatie probleem.
Het voorbeeld programma bestaat uit 3 threads: 2 producers en 1 consumer. De producers produceren data en zetten dit in een buffer. De consumer leest de data uit de buffer en consumeert deze data. Vanzelfsprekend moet ervoor gezorgd worden dat producers en consumer synchroniseren:
Bij het vak OPSYS uit de basis hebben jullie geleerd dat deze synchronisatie met behulp van 3 semaphoren kan worden gerealiseerd:
semEmpty
telt het aantal lege plaatsen.
wait
(ook wel P
of
passeer
genoemd) op deze semaphore aan voordat ze iets in de
buffer plaatsen. Hierdoor wordt de semaphore telkens als er 1 plaats in de
buffer gevuld is met 1 verlaagd. Als de semaphore semEmpty
0
is moeten de producers wachten (er zijn geen lege plaatsen meer, dus de buffer
is vol). Nadat de consumer een plaats uit de buffer heeft geleegd wordt een
post
(ook wel signal
, V
of
verhoog
genoemd) op de semaphore semEmpty
uitgevoerd.
Hierdoor wordt de semaphore telkens als er 1 plaats uit de buffer geleegd
is met 1 verhoogd.
semFilled
telt het aantal gevulde
plaatsen.wait
op deze semaphore aan voordat iets uit de buffer wordt
gelezen. Hierdoor wordt de semaphore telkens als er 1 plaats in de buffer
geleegd is met 1 verlaagd. Als de semaphore semFilled
0 is moet
de consumer wachten (er zijn geen gevulde plaatsen meer, dus de buffer is
leeg). Nadat de producer een plaats in de buffer heeft gevuld wordt een
post
op de semaphore semFull
uitgevoerd. Hierdoor
wordt de semaphore telkens als er 1 plaats in de buffer gevuld is met 1 verhoogd.
semMutualExclusive
zorgt voor wederzijdse
uitsluiting.wait
op deze semaphore aan voordat ze de gedeelde
variabelen gebruiken. Als ze klaar zijn met het gebruik van de gedeelde
variabelen wordt een post
op deze semaphore uitgevoerd. Dit
zorgt ervoor dat slecht 1 thread tegelijk de gedeelde variabelen kan gebruiken.
De buffer is als volgt globaal gedefinieerd:
#define SIZE 16
char buffer[SIZE]; // buffer voor het opslaan van SIZE char's
int indexGet; // index waar het volgende element uit de buffer gelezen wordt
int indexPut; // index waar het volgende element in de buffer geschreven wordt
sem_t semMutualExclusive; // binaire semaphore: zorgt voor wederzijdse uitsluiting
sem_t semEmpty; // counting semaphore: telt aantal lege plaatsen
sem_t semFilled; // counting semaphore: telt aantal gevulde plaatsen
Het gebruik van globale variabelen is natuurlijk niet "netjes". Later zullen we deze variabelen daarom "inpakken" in een class. Maar om te beginnen is het gebruik van globale variabelen eenvoudiger.
Het type sem_t
is gedefinieerd in de header file
semaphore.h
.
De link verwijst naar de POSIX standaard (IEEE Std 1003.1) documentatie.
Met behulp van de functie put
kan een karakter in de buffer
worden geschreven.
void put(const char c) {
check_errno( sem_wait(&semEmpty) ); // verlaag het aantal lege plaatsen. WACHT als er geen lege plaatsen zijn!
check_errno( sem_wait(&semMutualExclusive) ); // ga critisch gebied binnen
buffer[indexPut++]=c;
indexPut&=SIZE-1; // slim maar werkt alleen als SIZE een macht van 2 is
check_errno( sem_post(&semMutualExclusive) ); // verlaat critische gebied
check_errno( sem_post(&semFilled) ); // verhoog het aantal gevulde plaatsen.
}
De functies sem_wait
en sem_post
hebben een return
waarde van 0 als er geen fout is opgetreden. In het geval van een error wordt
-1 teruggegeven en wordt de standaard globale variabele errno
gevuld met het errornummer. In het voorbeeldprogramma is de functie
check_errno
geschreven die deze returnwaarde controlleert en
in geval van een error het programma afbreekt met een passende foutmelding.
void check_errno(int error) {
if (error<0) {
perror("Error");
exit(EXIT_FAILURE);
}
}
Met behulp van de functie get
kan een karakter uit de buffer
worden gelezen.
char get(void) {
char c;
check_errno( sem_wait(&semFilled) ); // verlaag het aantal gevulde plaatsen. WACHT als er geen gevulde plaatsen zijn!
check_errno( sem_wait(&semMutualExclusive) ); // ga critische gebied binnen
c=buffer[indexGet++];
indexGet&=SIZE-1; // slim maar werkt alleen als SIZE een macht van 2 is
check_errno( sem_post(&semMutualExclusive) ); // verlaat critische gebied
check_errno( sem_post(&semEmpty) ); // verhoog het aantal lege plaatsen
return c;
}
De semaphoren worden in de main functie als volgt geïnitialiseerd:
check_errno( sem_init(&semMutualExclusive, 0, 1) ); // 1 thread tegelijk toelaten in critische gebied
check_errno( sem_init(&semEmpty, 0, SIZE) ); // er zijn SIZE lege plaatsen
check_errno( sem_init(&semFilled, 0, 0) ); // er zijn 0 gevulde plaatsen
De twee producers gebruiken dezelfde code:
void* producer(void* arg) {
char c=*(char*)arg;
int i;
check_errno( sem_wait(&semPrintf) );
check_errno( printf("Thread: %d met arg: %c gestart\n", pthread_self(), c) );
check_errno( sem_post(&semPrintf) );
for (i=0; i<1000; ++i) {
put(c);
}
check_errno( sem_wait(&semPrintf) );
check_errno( printf("Thread: %d gestopt\n", pthread_self()) );
check_errno( sem_post(&semPrintf) ); return NULL;
}
Het karakter dat geproduceerd moet worden, wordt (bij het starten van de
thread) als argument doorgegeven. Met behulp van de put
functie
worden 1000 karakters in de buffer geplaatst. Met behulp van een
printf
wordt het starten en stoppen van de thread gemeld. Er
is een extra semaphore semPrintf
gedefinieerd die er voor zorgt
dat de uitvoer niet afgewisseld wordt met de uitvoer van andere threads.
De consumer is als volgt gecodeerd:
void* consumer(void* arg) {
int i;
char c;
check_errno( sem_wait(&semPrintf) );
check_errno( printf("Thread: %d gestart\n", pthread_self()) );
check_errno( sem_post(&semPrintf) );
for (i=0; i<2000; ++i) {
c=get();
check_errno( sem_wait(&semPrintf) );
check_errno( putchar(c) );
check_errno( sem_post(&semPrintf) );
}
check_errno( sem_wait(&semPrintf) );
check_errno( printf("Thread: %d gestopt\n", pthread_self()) );
check_errno( sem_post(&semPrintf) ); return NULL;
}
Er worden 2000 karakters uit de buffer gelezen met behulp van de functie
get
. Elk karakter wordt met de functie putchar
op het scherm gezet. Ook wordt het starten en stoppen van de thread gemeld.
In de functie main zijn de volgende lokale variabelen gedefinieerd:
struct sched_param sp, spc, spp1, spp2;
int p, prioc, priop1, priop2;
pthread_attr_t ptac, ptap1, ptap2;
pthread_t ptp1, ptp2, ptc;
char frikadel='F', kroket='K';
In de functie main
krijgt de main thread een hoge prioriteit.
De main thread moet de hoogste prioriteit hebben omdat deze thread de andere
threads start en we willen de onderlinge wisselwerking van deze threads
bestuderen.
Het wijzigen van de prioriteit van een thread is redelijk ingewikkeld. Eerst
moeten met behulp van de functie
pthread_getschedparam
de scheduling parameters worden opgehaald. Deze scheduling parameters zijn
van het type struct sched_param
. In deze scheduling parameters
kan dan de prioriteit worden gezet in het dataveld
sched_priority
. Tot slot moeten de gewijzigde scheduling parameters
weer aan de thread worden doorgegeven met behulp van de functie
pthread_setschedparam
.
check( pthread_getschedparam(pthread_self(), &p, &sp) );
sp.sched_priority=60;
check( pthread_setschedparam(pthread_self(), SCHED_FIFO, &sp) );
De pthread_xxx
functies hebben een return waarde van 0 als er
geen fout is opgetreden. In het geval van een error wordt het errornummer
teruggegeven. De standaard globale variabele errno
wordt
niet gevuld met het errornummer. In het voorbeeldprogramma
is de functie check
geschreven die de returnwaarde van een pthread
functie controlleerd en in geval van een error het programma afbreekt met
een passende foutmelding.
void check(int error) {
if (error!=0) {
fprintf(stderr, "Error: %s\n", strerror(error));
exit(EXIT_FAILURE);
}
}
Volgens de IEEE Std 1003.1 kan de prioriteit van een thread ook op een eenvoudige wijze veranderd worden. Helaas wordt dit in QNX niet ondersteund.
check( pthread_setschedprio(pthread_self(), 60) ); // Werkt wel volgens IEEE Std 1003.1 maar niet in QNX
In dit programma
"bakt" de ene producer frikadellen (weergegeven met de letter
F
)
en "bakt" de andere producer kroketten (weergegeven met de letter
K
). De consumer "eet" de frikadellen en kroketten op.
Bij het starten van het programma moeten op de command line 3 parameters
worden meegegeven. Dit zijn achtereenvolgens de prioriteit van de consumer,
de prioriteit van de frikadellen producer en de prioriteit van de kroketten
producer. Deze prioriteiten worden opgeslagen in de variabelen
prioc
, priop1
en priop2
.
Bij het starten van een thread kan via een variabele van het type
pthread_attr_t
onder andere de scheduling policy en de scheduling
priority worden opgegeven. Zie eventueel paragraaf 13.14.2 van het theorieboek.
Een variabele van het type pthread_attr_t
kan worden
geïnitialiseerd met de default waarden met behulp van de functie
pthread_attr_init
:
check( pthread_attr_init(&ptac) );
check( pthread_attr_init(&ptap1) );
check( pthread_attr_init(&ptap2) );
Met behulp van de functie
pthread_attr_setinheritsched
wordt aangegeven dat bij het opstarten van de thread de scheduling priority
en policy die in de thread attributes zijn opgegeven gebruikt moet worden.
Als je het aanroepen van deze functie vergeet zal voor de op te starten thread
altijd de prioriteit en policy van de parent thread gebruikt worden.
Met behulp van de functie
pthread_attr_setschedpolicy
wordt de scheduling policy opgegeven. In dit programma is gekozen voor de
SHED_FIFO
policy. Dit wil zeggen dat bij threads met gelijke
prioriteit geen time slicing wordt gebruikt. Een thread wordt pas afgewisseld
door een thread met gelijke prioriteit als de eerste thread zelf waiting
wordt (bijvoorbeeld doordat een input/output bewerking is aangeroepen). Een
andere scheduling policy die ook door QNX wordt ondersteund is
SHED_RR
. Bij deze round-robbin scheduling worden threads met
gelijke prioriteit na het verstrijken van een time-slice afgewisseld.
check( pthread_attr_setschedpolicy(&ptac, SCHED_FIFO) );
check( pthread_attr_setschedpolicy(&ptap1, SCHED_FIFO) );
check( pthread_attr_setschedpolicy(&ptap2, SCHED_FIFO) );
Het opgeven van de prioriteit waarmee een thread gestart moet worden is
omslachtig. Eerst moeten met behulp van de functie
pthread_attr_getschedparam
de scheduling parameters worden opgehaald (uit de thread attributes). Deze
scheduling parameters zijn van het type struct sched_param
.
In deze scheduling parameters kan dan de prioriteit worden gezet in het dataveld
sched_priority
. Tot slot moeten de scheduling parameters weer
in de thread parameters worden opgeslagen met behulp van de functie
pthread_attr_setschedparam
.
check( pthread_attr_getschedparam(&ptac, &spc) );
check( pthread_attr_getschedparam(&ptap1, &spp1) );
check( pthread_attr_getschedparam(&ptap2, &spp2) );
spc.sched_priority=prioc;
spp1.sched_priority=priop1;
spp2.sched_priority=priop2;
check( pthread_attr_setschedparam(&ptac, &spc) );
check( pthread_attr_setschedparam(&ptap1, &spp1) );
check( pthread_attr_setschedparam(&ptap2, &spp2) );
Nu de thread attributes zijn "ingesteld" kunnen de threads worden opgestart
met behulp van de functie
pthread_create
:
check( pthread_create(&ptc, &ptac, consumer, 0) );
check( pthread_create(&ptp1, &ptap1, producer, &frikadel) );
check( pthread_create(&ptp2, &ptap2, producer, &kroket) );
De thread ID's van deze threads zijn van het type pthread_t
en worden in de variabelen ptc
, ptp1
en
ptp2
opgeslagen.
Vervolgens moet de main thread wachten totdat de andere threads geëindigd zijn. Bij het beëindigen van de main thread van een proces worden namelijk ook alle andere threads van het proces gestopt en wordt het gehele proces beëindigd.
check( pthread_join(ptc, 0) );
check( pthread_join(ptp1, 0) );
check( pthread_join(ptp2, 0) );
Als alle threads beëindigd zijn worden de aangemaakte semaphoren en thread attributes opgeruimd:
check_errno( sem_destroy(&semMutualExclusive) );
check_errno( sem_destroy(&semEmpty) );
check_errno( sem_destroy(&semFilled) );
check( pthread_attr_destroy(&ptac) );
check( pthread_attr_destroy(&ptap1) );
check( pthread_attr_destroy(&ptap2) );
Dit is in dit geval niet stikt noodzakelijk omdat bij het eindigen van het proces toch alles automatisch wordt opgeruimd.
Het complete voorbeeld programma in C en de bijbehorende makefile kun je vinden in de files opdr2.c en makefile.
De buffer uit het voorbeeldprogramma in C is niet erg herbruikbaar. In C++
kun je alle data en functies van de buffer "inkapselen" in de class
CircularBuffer
. Door een template
te gebruiken
kunnen we een herbuikbaar buffer definiëren dat met elk willekeurig
type gebruikt kan worden. Ook het aantal plaatsen in de buffer kunnen we
met behulp van een template parameter instelbaar maken.
template<typename T, int size> // size moet een macht van 2 zijn!
class CircularBuffer {
public:
CircularBuffer(); // constructor zorgt voor de initialisatiie van de datamembers
~CircularBuffer();
void put(const T& t); // schrijf element in de buffer. WACHT als buffer vol is!
const T get(); // lees een element uit de buffer. WACHT als buffer leeg is!
private:
T buffer[size]; // buffer voor het opslaan van size elementen van het type T
int indexGet; // index waar het volgende element uit de buffer gelezen wordt
int indexPut; // index waar het volgende element in de buffer geschreven wordt
sem_t semMutualExclusive; // binaire semaphore: zorgt voor wederzijdse uitsluiting
sem_t semEmpty; // counting semaphore: telt aantal lege plaatsen
sem_t semFilled; // counting semaphore: telt aantal gevulde plaatsen
};
De semaphoren worden nu geïnitialiseerd in de constructor en opgeruimd in de destructor.
Het complete voorbeeld programma in C++ en de bijbehorende makefile kun je vinden in de files opdr2.cpp en makefile.
Opdracht 2a.Compileer het programma opdr2.c of opdr2.cpp met de bijbehorende makefile en start dit programma met de prioriteiten: consumer = 22, frikadellen producer = 21 en kroketten producer = 20. opdr2.out 22 21 20 Verklaar de uitvoer! Wees nauwkeurig in je verklaring. Verklaar waarom eerst alle frikadellen gebakken worden en pas daarna de kroketten. Verklaar waarom er geen frikadellen geconsumeerd worden nadat de frikadellenbakker gestopt is. Om het programma te analyseren kun je eventueel gebruik maken van de DDD debugger. |
De code van de consumer lijkt inefficient:
In C:
c=get();
check_errno( sem_wait(&semPrintf) );
check_errno( putchar(c) );
check_errno( sem_post(&semPrintf) );
In C++:
char c(b.get());
check_errno( sem_wait(&semCout) );
cout<<c;
check_errno( sem_post(&semCout) );
Het gebruik van de lokale variabele lijkt overbodig.
In C:
check_errno( sem_wait(&semPrintf) );
check_errno( putchar(get()) );
check_errno( sem_post(&semPrintf) );
In C++:
check_errno( sem_wait(&semCout) );
cout<<b.get();
check_errno( sem_post(&semCout) );
Bij het gebruik van priority scheduling kan priority inversion optreden. Dit is ongewenst. Bestudeer paragraaf 13.10 van het theorie boek waarin priority inversion besproken wordt. Een oplossing om priority inversion te voorkomen is priority inheritance, dit wordt ook in paragraaf 13.10 uitgelegd. QNX gebruikt priority inheritance om priority inversion te voorkomen. Bestudeer indien nodig ook pagina 167 en verder in het QNX boek.
Om threads met elkaar te synchroniseren kunnen we in plaats van semaphoren ook mutexen en conditionele variabelen gebruiken. Zie eventueel paragraaf 8.6.3 in het theorieboek. De relevante IEEE Std 1003.1 functies zijn:
pthread_mutex_init
![]() |
pthread_mutex_init
![]() |
pthread_mutex_destroy
![]() |
pthread_mutex_destroy
![]() |
pthread_mutex_lock
![]() |
pthread_mutex_lock
![]() |
pthread_mutex_unlock
![]() |
pthread_mutex_unlock
![]() |
pthread_cond_init ![]() |
pthread_cond_init
![]() |
pthread_cond_destroy ![]() |
pthread_cond_destroy
![]() |
pthread_cond_wait ![]() |
pthread_cond_wait
![]() |
pthread_cond_signal ![]() |
pthread_cond_signal
![]() |
Opdracht 2e.Vervang in het voorbeeld programma de synchronisatie met semaphoren door synchronisatie met mutexen en conditionele variabelen. |
Let op! Opgave 2f kun je pas maken na de theorie van dagdeel 3. Je kunt deze theorie zelf bestuderen of je kunt eerst opgave 3 maken en daarna opgave 2f.
Bij het gebruik van semaphoren, mutexen en conditionele variabelen wordt gemeenschappelijk geheugen gebruikt om te synchroniseren. Het is ook mogelijk om met behulp van message passing te synchroniseren. De IEEE Std 1003.1 definieert voor dit doel zogenaamde message queues. Zie eventueel paragraaf 9.5 en 13.14.2 in het theorieboek. De relevante functies zijn:
mq_open
![]() |
mq_open
![]() |
mq_close
![]() |
mq_close
![]() |
mq_unlink
![]() |
mq_unlink
![]() |
mq_send
![]() |
mq_send
![]() |
mq_receive
![]() |
mq_receive
![]() |