© Harry Broeders.
In deze practicum opdracht maak je kennis met Pthreads door een bestaand multi-threaded programma te analyseren. We besteden daarbij vooral veel tijd aan het bestuderen van de real-time scheduling eigenschappen.
Het probleem dat we gaan analyseren is het zogenaamde producer-consumer synchronisatie probleem.
Het voorbeeld programma bestaat uit 3 threads: 2 producers en 1 consumer. De producers produceren data en zetten dit in een buffer. De consumer leest de data uit de buffer en consumeert deze data. Vanzelfsprekend moet ervoor gezorgd worden dat producers en consumer synchroniseren:
Deze synchronisatie kan met behulp van 3 semaphoren kan worden gerealiseerd:
semEmpty
telt het aantal lege plaatsen. wait
(ook wel P
of
passeer
genoemd) op deze semaphore aan voordat ze iets in de
buffer plaatsen. Hierdoor wordt de semaphore, telkens als er 1 plaats in de
buffer gevuld is, met 1 verlaagd. Als de semaphore semEmpty
0
is moeten de producers wachten (er zijn geen lege plaatsen meer, dus de
buffer is vol). Nadat de consumer een plaats uit de buffer heeft geleegd
wordt een post
(ook wel signal
, V
of
verhoog
genoemd) op de semaphore semEmpty
uitgevoerd. Hierdoor wordt de semaphore, telkens als er 1 plaats uit de
buffer geleegd is, met 1 verhoogd. semFilled
telt het aantal gevulde plaatsen.wait
op deze semaphore aan voordat iets uit de buffer wordt
gelezen. Hierdoor wordt de semaphore, telkens als er 1 plaats in de buffer
geleegd is, met 1 verlaagd. Als de semaphore semFilled
0 is
moet de consumer wachten (er zijn geen gevulde plaatsen meer, dus de buffer
is leeg). Nadat de producer een plaats in de buffer heeft gevuld wordt een
post
op de semaphore semFull
uitgevoerd. Hierdoor
wordt de semaphore, telkens als er 1 plaats in de buffer gevuld is, met 1
verhoogd. semMutualExclusive
zorgt voor wederzijdse
uitsluiting.wait
op deze semaphore aan voordat ze de gedeelde
variabelen gebruiken. Als ze klaar zijn met het gebruik van de gedeelde
variabelen wordt een post
op deze semaphore uitgevoerd. Dit
zorgt ervoor dat slecht 1 thread tegelijk de gedeelde variabelen kan
gebruiken. De buffer is als volgt globaal gedefinieerd:
#define SIZE 16
char buffer[SIZE]; // buffer voor het opslaan van SIZE char's
int indexGet = 0; // index waar het volgende element uit de buffer gelezen wordt
int indexPut = 0; // index waar het volgende element in de buffer geschreven wordt
sem_t semMutualExclusive; // binaire semaphore: zorgt voor wederzijdse uitsluiting
sem_t semEmpty; // counting semaphore: telt aantal lege plaatsen
sem_t semFilled; // counting semaphore: telt aantal gevulde plaatsen
Het gebruik van globale variabelen is natuurlijk niet "netjes". Later zullen we deze variabelen daarom "inpakken" in een class. Maar om te beginnen is het gebruik van globale variabelen eenvoudiger.
Het type sem_t
is gedefinieerd in de header file semaphore.h
.
De link verwijst naar de POSIX standaard (IEEE Std 1003.1) documentatie.
Met behulp van de functie put
kan een karakter in de buffer
worden geschreven.
void put(char c) {
check_errno( sem_wait(&semEmpty) ); // verlaag het aantal lege plaatsen. WACHT als er geen lege plaatsen zijn!
check_errno( sem_wait(&semMutualExclusive) ); // ga critisch gebied binnen
buffer[indexPut] = c; indexPut++;
if (indexPut == SIZE) { indexPut = 0; }
check_errno( sem_post(&semMutualExclusive) ); // verlaat critische gebied
check_errno( sem_post(&semFilled) ); // verhoog het aantal gevulde plaatsen.
}
De functies sem_wait
en sem_post
hebben een return
waarde van 0 als er geen fout is opgetreden. In het geval van een error wordt
-1 teruggegeven en wordt de standaard globale variabele errno
gevuld met het errornummer. In het voorbeeldprogramma is de functie
check_errno
geschreven die deze returnwaarde controlleert en in
geval van een error het programma afbreekt met een passende foutmelding.
void check_errno(int error) {
if (error < 0) {
perror("Error");
exit(EXIT_FAILURE);
}
}
Met behulp van de functie get
kan een karakter uit de buffer
worden gelezen.
char get(void) {
char c;
check_errno( sem_wait(&semFilled) ); // verlaag het aantal gevulde plaatsen. WACHT als er geen gevulde plaatsen zijn!
check_errno( sem_wait(&semMutualExclusive) ); // ga critische gebied binnen
c = buffer[indexGet]; indexGet++;
if (indexGet == SIZE) { indexGet = 0; } check_errno( sem_post(&semMutualExclusive) ); // verlaat critische gebied
check_errno( sem_post(&semEmpty) ); // verhoog het aantal lege plaatsen
return c;
}
De semaphoren worden in de main functie als volgt geïnitialiseerd:
check_errno( sem_init(&semMutualExclusive, 0, 1) ); // 1 thread tegelijk toelaten in critische gebied
check_errno( sem_init(&semEmpty, 0, SIZE) ); // er zijn SIZE lege plaatsen
check_errno( sem_init(&semFilled, 0, 0) ); // er zijn 0 gevulde plaatsen
De twee producers gebruiken dezelfde code:
void* producer(void* arg) {
char c = *(char*)arg;
int i;
check_errno( sem_wait(&semPrintf) );
check_errno( printf("Thread: %d met arg: %c gestart\n", pthread_self(), c) );
check_errno( sem_post(&semPrintf) );
for (i = 0; i < 1000; ++i) {
put(c);
}
check_errno( sem_wait(&semPrintf) );
check_errno( printf("Thread: %d gestopt\n", pthread_self()) );
check_errno( sem_post(&semPrintf) ); return NULL;
}
Het karakter dat geproduceerd moet worden, wordt (bij het starten van de
thread) als argument doorgegeven. Met behulp van de put
functie
worden 1000 karakters in de buffer geplaatst. Met behulp van een
printf
wordt het starten en stoppen van de thread gemeld. Er is
een extra semaphore semPrintf
gedefinieerd die er voor zorgt dat
de uitvoer niet afgewisseld wordt met de uitvoer van andere threads.
De consumer is als volgt gecodeerd:
void* consumer(void* arg) {
int i;
char c;
check_errno( sem_wait(&semPrintf) );
check_errno( printf("Thread: %d gestart\n", pthread_self()) );
check_errno( sem_post(&semPrintf) );
for (i = 0; i < 2000; ++i) {
c = get();
check_errno( sem_wait(&semPrintf) );
check_errno( putchar(c) );
check_errno( sem_post(&semPrintf) );
}
check_errno( sem_wait(&semPrintf) );
check_errno( printf("Thread: %d gestopt\n", pthread_self()) );
check_errno( sem_post(&semPrintf) ); return NULL;
}
Er worden 2000 karakters uit de buffer gelezen met behulp van de functie
get
. Elk karakter wordt met de functie putchar
op het
scherm gezet. Ook wordt het starten en stoppen van de thread gemeld.
In de functie main zijn de volgende lokale variabelen gedefinieerd:
struct sched_param sp, spc, spp1, spp2;
int p, prioc, priop1, priop2;
pthread_attr_t ptac, ptap1, ptap2;
pthread_t ptp1, ptp2, ptc;
char frikadel='F', kroket='K';
In de functie main
krijgt de main thread een hoge prioriteit.
De main thread moet de hoogste prioriteit hebben omdat deze thread de andere
threads start en we willen de onderlinge wisselwerking van deze threads
bestuderen.
Het wijzigen van de prioriteit van een thread is redelijk ingewikkeld. Eerst
moeten met behulp van de functie pthread_getschedparam
de scheduling parameters worden opgehaald. Deze scheduling parameters zijn van
het type struct sched_param
. In deze scheduling parameters kan dan
de prioriteit worden gezet in het dataveld sched_priority
. Tot
slot moeten de gewijzigde scheduling parameters weer aan de thread worden
doorgegeven met behulp van de functie pthread_setschedparam
.
check( pthread_getschedparam(pthread_self(), &p, &sp) );
sp.sched_priority = 60;
check( pthread_setschedparam(pthread_self(), SCHED_FIFO, &sp) );
De pthread_xxx
functies hebben een return waarde van 0 als er
geen fout is opgetreden. In het geval van een error wordt het errornummer
teruggegeven. De standaard globale variabele errno
wordt
niet gevuld met het errornummer. In het voorbeeldprogramma is
de functie check
geschreven die de returnwaarde van een pthread
functie controlleert en in geval van een error het programma afbreekt met een
passende foutmelding.
void check(int error) {
if (error != 0) {
fprintf(stderr, "Error: %s\n", strerror(error));
exit(EXIT_FAILURE);
}
}
Volgens de IEEE Std 1003.1 kan de prioriteit van een thread ook op een eenvoudige wijze veranderd worden. Helaas wordt dit in QNX niet ondersteund.
check( pthread_setschedprio(pthread_self(), 60) ); // Werkt wel volgens IEEE Std 1003.1 maar niet in QNX
In dit programma "bakt" de ene producer frikadellen (weergegeven
met de letter
F
) en "bakt" de andere producer kroketten
(weergegeven met de letter K
). De consumer "eet" de frikadellen en
kroketten op.
Bij het starten van het programma moeten op de command line 3 parameters
worden meegegeven. Dit zijn achtereenvolgens de prioriteit van de consumer, de
prioriteit van de frikadellen producer en de prioriteit van de kroketten
producer. Deze prioriteiten worden opgeslagen in de variabelen
prioc
, priop1
en priop2
.
Bij het starten van een thread kan via een variabele van het type
pthread_attr_t
onder andere de scheduling policy en de scheduling
priority worden opgegeven. Zie eventueel paragraaf 12.6 van het theorieboek.
Een variabele van het type pthread_attr_t
kan worden
geïnitialiseerd met de default waarden met behulp van de functie pthread_attr_init
:
check( pthread_attr_init(&ptac) );
check( pthread_attr_init(&ptap1) );
check( pthread_attr_init(&ptap2) );
Met behulp van de functie pthread_attr_setinheritsched
wordt aangegeven dat bij het opstarten van de thread de scheduling priority en
policy die in de thread attributes zijn opgegeven gebruikt moet worden. Als je
het aanroepen van deze functie vergeet zal voor de op te starten thread altijd
de prioriteit en policy van de parent thread gebruikt worden.
Met behulp van de functie pthread_attr_setschedpolicy
wordt de scheduling policy opgegeven. In dit programma is gekozen voor de
SHED_FIFO
policy. Dit wil zeggen dat bij threads met gelijke
prioriteit geen time slicing wordt gebruikt. Een thread wordt pas afgewisseld
door een thread met gelijke prioriteit als de eerste thread zelf waiting wordt
(bijvoorbeeld doordat een input/output bewerking is aangeroepen). Een andere
scheduling policy die ook door QNX wordt ondersteund is SHED_RR
.
Bij deze round-robbin scheduling worden threads met gelijke prioriteit na het
verstrijken van een time-slice afgewisseld.
check( pthread_attr_setschedpolicy(&ptac, SCHED_FIFO) );
check( pthread_attr_setschedpolicy(&ptap1, SCHED_FIFO) );
check( pthread_attr_setschedpolicy(&ptap2, SCHED_FIFO) );
Het opgeven van de prioriteit waarmee een thread gestart moet worden is
omslachtig. Eerst moeten met behulp van de functie pthread_attr_getschedparam
de scheduling parameters worden opgehaald (uit de thread attributes). Deze
scheduling parameters zijn van het type struct sched_param
. In
deze scheduling parameters kan dan de prioriteit worden gezet in het dataveld
sched_priority
. Tot slot moeten de scheduling parameters weer in
de thread parameters worden opgeslagen met behulp van de functie pthread_attr_setschedparam
.
check( pthread_attr_getschedparam(&ptac, &spc) );
check( pthread_attr_getschedparam(&ptap1, &spp1) );
check( pthread_attr_getschedparam(&ptap2, &spp2) );
spc.sched_priority = prioc;
spp1.sched_priority = priop1;
spp2.sched_priority = priop2;
check( pthread_attr_setschedparam(&ptac, &spc) );
check( pthread_attr_setschedparam(&ptap1, &spp1) );
check( pthread_attr_setschedparam(&ptap2, &spp2) );
Nu de thread attributes zijn "ingesteld" kunnen de threads worden opgestart
met behulp van de functie pthread_create
:
check( pthread_create(&ptc, &ptac, consumer, 0) );
check( pthread_create(&ptp1, &ptap1, producer, &frikadel) );
check( pthread_create(&ptp2, &ptap2, producer, &kroket) );
De thread ID's van deze threads zijn van het type pthread_t
en
worden in de variabelen ptc
, ptp1
en
ptp2
opgeslagen.
Vervolgens moet de main thread wachten totdat de andere threads geëindigd zijn. Bij het beëindigen van de main thread van een proces worden namelijk ook alle andere threads van het proces gestopt en wordt het gehele proces beëindigd.
check( pthread_join(ptc, 0) );
check( pthread_join(ptp1, 0) );
check( pthread_join(ptp2, 0) );
Als alle threads beëindigd zijn worden de aangemaakte semaphoren en thread attributes opgeruimd:
check_errno( sem_destroy(&semMutualExclusive) );
check_errno( sem_destroy(&semEmpty) );
check_errno( sem_destroy(&semFilled) );
check( pthread_attr_destroy(&ptac) );
check( pthread_attr_destroy(&ptap1) );
check( pthread_attr_destroy(&ptap2) );
Dit is in dit geval niet stikt noodzakelijk omdat bij het eindigen van het proces toch alles automatisch wordt opgeruimd.
Het complete voorbeeld programma in C en de bijbehorende makefile kun je vinden in de files opdr2.c en makefile.
De buffer uit het voorbeeldprogramma in C is niet erg herbruikbaar. In C++
kun je alle data en functies van de buffer "inkapselen" in de class
CircularBuffer
. Door een template
te gebruiken kunnen
we een herbuikbaar buffer definiëren dat met elk willekeurig type gebruikt kan
worden. Ook het aantal plaatsen in de buffer kunnen we met behulp van een
template parameter instelbaar maken.
template<typename T, int size>
class CircularBuffer {
public:
CircularBuffer(); // constructor zorgt voor de initialisatiie van de datamembers
~CircularBuffer();
void put(const T& t); // schrijf element in de buffer. WACHT als buffer vol is!
const T get(); // lees een element uit de buffer. WACHT als buffer leeg is!
private:
T buffer[size]; // buffer voor het opslaan van size elementen van het type T
int indexGet; // index waar het volgende element uit de buffer gelezen wordt
int indexPut; // index waar het volgende element in de buffer geschreven wordt
sem_t semMutualExclusive; // binaire semaphore: zorgt voor wederzijdse uitsluiting
sem_t semEmpty; // counting semaphore: telt aantal lege plaatsen
sem_t semFilled; // counting semaphore: telt aantal gevulde plaatsen
};
De semaphoren worden nu geïnitialiseerd in de constructor en opgeruimd in de destructor.
Het complete voorbeeld programma in C++ en de bijbehorende makefile kun je vinden in de files opdr2.cpp en makefile.
Opdracht 2a.Compileer het programma opdr2.c of opdr2.cpp met de bijbehorende makefile en start dit programma met de prioriteiten: consumer = 22, frikadellen producer = 21 en kroketten producer = 20. opdr2.out 22 21 20 Verklaar de uitvoer! Wees nauwkeurig in je verklaring. Verklaar waarom eerst alle frikadellen gebakken worden en pas daarna de kroketten. Verklaar waarom er geen frikadellen geconsumeerd worden nadat de frikadellenbakker gestopt is. Om het programma te analyseren kun je eventueel gebruik maken van de DDD debugger. Maak aantekeningen zodat de docent deze opdracht vlot kan aftekenen. |
De code van de consumer lijkt inefficient:
In C:
c = get();
check_errno( sem_wait(&semPrintf) );
check_errno( putchar(c) );
check_errno( sem_post(&semPrintf) );
In C++:
char c(b.get());
check_errno( sem_wait(&semCout) );
cout << c;
check_errno( sem_post(&semCout) );
Het gebruik van de lokale variabele lijkt overbodig.
In C:
check_errno( sem_wait(&semPrintf) );
check_errno( putchar(get()) );
check_errno( sem_post(&semPrintf) );
In C++:
check_errno( sem_wait(&semCout) );
cout << b.get();
check_errno( sem_post(&semCout) );
Om threads met elkaar te synchroniseren kunnen we in plaats van semaphoren ook mutexen en conditionele variabelen gebruiken. Zie eventueel paragraaf 5.7 in het theorieboek. De relevante IEEE Std 1003.1 functies zijn:
pthread_mutex_init
![]() |
pthread_mutex_init
![]() |
pthread_mutex_destroy
![]() |
pthread_mutex_destroy
![]() |
pthread_mutex_lock
![]() |
pthread_mutex_lock
![]() |
pthread_mutex_unlock
![]() |
pthread_mutex_unlock
![]() |
Opdracht 2e.Vervang in het voorbeeld programma de synchronisatie met semaphoren door synchronisatie met mutexen en conditionele variabelen. |