RTSYST Opdracht 2: Multi-threaded programmeren in POSIX.

© Harry Broeders.

Pthreads.

In deze practicum opdracht maak je kennis met Pthreads door een bestaand multi-threaded programma te analyseren. We besteden daarbij vooral veel tijd aan het bestuderen van de real-time scheduling eigenschappen.

Voorbeeld programma in C.

Het probleem dat we gaan analyseren is het zogenaamde producer-consumer synchronisatie probleem.

Het voorbeeld programma bestaat uit 3 threads: 2 producers en 1 consumer. De producers produceren data en zetten dit in een buffer. De consumer leest de data uit de buffer en consumeert deze data. Vanzelfsprekend moet ervoor gezorgd worden dat producers en consumer synchroniseren:

Deze synchronisatie kan met behulp van 3 semaphoren kan worden gerealiseerd:

De buffer is als volgt globaal gedefinieerd:

#define SIZE 16
char buffer[SIZE]; // buffer voor het opslaan van SIZE char's
int indexGet = 0; // index waar het volgende element uit de buffer gelezen wordt
int indexPut = 0; // index waar het volgende element in de buffer geschreven wordt
sem_t semMutualExclusive; // binaire semaphore: zorgt voor wederzijdse uitsluiting
sem_t semEmpty; // counting semaphore: telt aantal lege plaatsen
sem_t semFilled; // counting semaphore: telt aantal gevulde plaatsen

Het gebruik van globale variabelen is natuurlijk niet "netjes". Later zullen we deze variabelen daarom "inpakken" in een class. Maar om te beginnen is het gebruik van globale variabelen eenvoudiger.

Het type sem_t is gedefinieerd in de header file semaphore.h. De link verwijst naar de POSIX standaard (IEEE Std 1003.1) documentatie.

Met behulp van de functie put kan een karakter in de buffer worden geschreven.

void put(char c) {
check_errno( sem_wait(&semEmpty) ); // verlaag het aantal lege plaatsen. WACHT als er geen lege plaatsen zijn!
check_errno( sem_wait(&semMutualExclusive) ); // ga critisch gebied binnen
buffer[indexPut] = c; indexPut++;
if (indexPut == SIZE) { indexPut = 0; }
check_errno( sem_post(&semMutualExclusive) ); // verlaat critische gebied
check_errno( sem_post(&semFilled) ); // verhoog het aantal gevulde plaatsen.
}

De functies sem_wait en sem_post hebben een return waarde van 0 als er geen fout is opgetreden. In het geval van een error wordt -1 teruggegeven en wordt de standaard globale variabele errno gevuld met het errornummer. In het voorbeeldprogramma is de functie check_errno geschreven die deze returnwaarde controlleert en in geval van een error het programma afbreekt met een passende foutmelding.

void check_errno(int error) {
if (error < 0) {
perror("Error");
exit(EXIT_FAILURE);
}
}

Met behulp van de functie get kan een karakter uit de buffer worden gelezen.

char get(void) {
char c;
check_errno( sem_wait(&semFilled) ); // verlaag het aantal gevulde plaatsen. WACHT als er geen gevulde plaatsen zijn!
check_errno( sem_wait(&semMutualExclusive) ); // ga critische gebied binnen
c = buffer[indexGet]; indexGet++;
if (indexGet == SIZE) { indexGet = 0; } check_errno( sem_post(&semMutualExclusive) ); // verlaat critische gebied
check_errno( sem_post(&semEmpty) ); // verhoog het aantal lege plaatsen
return c;
}

De semaphoren worden in de main functie als volgt geïnitialiseerd:

   check_errno( sem_init(&semMutualExclusive, 0, 1) ); // 1 thread tegelijk toelaten in critische gebied
check_errno( sem_init(&semEmpty, 0, SIZE) ); // er zijn SIZE lege plaatsen
check_errno( sem_init(&semFilled, 0, 0) ); // er zijn 0 gevulde plaatsen

De twee producers gebruiken dezelfde code:

void* producer(void* arg) {
char c = *(char*)arg;
int i;
check_errno( sem_wait(&semPrintf) );
check_errno( printf("Thread: %d met arg: %c gestart\n", pthread_self(), c) );
check_errno( sem_post(&semPrintf) );
for (i = 0; i < 1000; ++i) {
put(c);
}
check_errno( sem_wait(&semPrintf) );
check_errno( printf("Thread: %d gestopt\n", pthread_self()) );
check_errno( sem_post(&semPrintf) ); return NULL;
}

Het karakter dat geproduceerd moet worden, wordt (bij het starten van de thread) als argument doorgegeven. Met behulp van de put functie worden 1000 karakters in de buffer geplaatst. Met behulp van een printf wordt het starten en stoppen van de thread gemeld. Er is een extra semaphore semPrintf gedefinieerd die er voor zorgt dat de uitvoer niet afgewisseld wordt met de uitvoer van andere threads.

De consumer is als volgt gecodeerd:

void* consumer(void* arg) {
int i;
char c;
check_errno( sem_wait(&semPrintf) );
check_errno( printf("Thread: %d gestart\n", pthread_self()) );
check_errno( sem_post(&semPrintf) );
for (i = 0; i < 2000; ++i) {
c = get();
check_errno( sem_wait(&semPrintf) );
check_errno( putchar(c) );
check_errno( sem_post(&semPrintf) );
}
check_errno( sem_wait(&semPrintf) );
check_errno( printf("Thread: %d gestopt\n", pthread_self()) );
check_errno( sem_post(&semPrintf) ); return NULL;
}

Er worden 2000 karakters uit de buffer gelezen met behulp van de functie get. Elk karakter wordt met de functie putchar op het scherm gezet. Ook wordt het starten en stoppen van de thread gemeld.

In de functie main zijn de volgende lokale variabelen gedefinieerd:

   struct sched_param sp, spc, spp1, spp2;
int p, prioc, priop1, priop2;
pthread_attr_t ptac, ptap1, ptap2;
pthread_t ptp1, ptp2, ptc;
char frikadel='F', kroket='K';

In de functie main krijgt de main thread een hoge prioriteit. De main thread moet de hoogste prioriteit hebben omdat deze thread de andere threads start en we willen de onderlinge wisselwerking van deze threads bestuderen.

Het wijzigen van de prioriteit van een thread is redelijk ingewikkeld. Eerst moeten met behulp van de functie pthread_getschedparam de scheduling parameters worden opgehaald. Deze scheduling parameters zijn van het type struct sched_param. In deze scheduling parameters kan dan de prioriteit worden gezet in het dataveld sched_priority. Tot slot moeten de gewijzigde scheduling parameters weer aan de thread worden doorgegeven met behulp van de functie pthread_setschedparam.

   check( pthread_getschedparam(pthread_self(), &p, &sp) );
sp.sched_priority = 60;
check( pthread_setschedparam(pthread_self(), SCHED_FIFO, &sp) );

De pthread_xxx functies hebben een return waarde van 0 als er geen fout is opgetreden. In het geval van een error wordt het errornummer teruggegeven. De standaard globale variabele errno wordt niet gevuld met het errornummer. In het voorbeeldprogramma is de functie check geschreven die de returnwaarde van een pthread functie controlleert en in geval van een error het programma afbreekt met een passende foutmelding.

void check(int error) {
if (error != 0) {
fprintf(stderr, "Error: %s\n", strerror(error));
exit(EXIT_FAILURE);
}
}

Volgens de IEEE Std 1003.1 kan de prioriteit van een thread ook op een eenvoudige wijze veranderd worden. Helaas wordt dit in QNX niet ondersteund.

   check( pthread_setschedprio(pthread_self(), 60) ); // Werkt wel volgens IEEE Std 1003.1 maar niet in QNX

In dit programma "bakt" de ene producer frikadellen (weergegeven met de letter F) en "bakt" de andere producer kroketten (weergegeven met de letter K). De consumer "eet" de frikadellen en kroketten op.

Bij het starten van het programma moeten op de command line 3 parameters worden meegegeven. Dit zijn achtereenvolgens de prioriteit van de consumer, de prioriteit van de frikadellen producer en de prioriteit van de kroketten producer. Deze prioriteiten worden opgeslagen in de variabelen prioc, priop1 en priop2.

Bij het starten van een thread kan via een variabele van het type pthread_attr_t onder andere de scheduling policy en de scheduling priority worden opgegeven. Zie eventueel paragraaf 12.6 van het theorieboek. Een variabele van het type pthread_attr_t kan worden geïnitialiseerd met de default waarden met behulp van de functie pthread_attr_init:

   check( pthread_attr_init(&ptac) );
check( pthread_attr_init(&ptap1) );
check( pthread_attr_init(&ptap2) );

Met behulp van de functie pthread_attr_setinheritsched wordt aangegeven dat bij het opstarten van de thread de scheduling priority en policy die in de thread attributes zijn opgegeven gebruikt moet worden. Als je het aanroepen van deze functie vergeet zal voor de op te starten thread altijd de prioriteit en policy van de parent thread gebruikt worden.

Met behulp van de functie pthread_attr_setschedpolicy wordt de scheduling policy opgegeven. In dit programma is gekozen voor de SHED_FIFO policy. Dit wil zeggen dat bij threads met gelijke prioriteit geen time slicing wordt gebruikt. Een thread wordt pas afgewisseld door een thread met gelijke prioriteit als de eerste thread zelf waiting wordt (bijvoorbeeld doordat een input/output bewerking is aangeroepen). Een andere scheduling policy die ook door QNX wordt ondersteund is SHED_RR. Bij deze round-robbin scheduling worden threads met gelijke prioriteit na het verstrijken van een time-slice afgewisseld.

   check( pthread_attr_setschedpolicy(&ptac, SCHED_FIFO) );
check( pthread_attr_setschedpolicy(&ptap1, SCHED_FIFO) );
check( pthread_attr_setschedpolicy(&ptap2, SCHED_FIFO) );

Het opgeven van de prioriteit waarmee een thread gestart moet worden is omslachtig. Eerst moeten met behulp van de functie pthread_attr_getschedparam de scheduling parameters worden opgehaald (uit de thread attributes). Deze scheduling parameters zijn van het type struct sched_param. In deze scheduling parameters kan dan de prioriteit worden gezet in het dataveld sched_priority. Tot slot moeten de scheduling parameters weer in de thread parameters worden opgeslagen met behulp van de functie pthread_attr_setschedparam.

   check( pthread_attr_getschedparam(&ptac, &spc) );
check( pthread_attr_getschedparam(&ptap1, &spp1) );
check( pthread_attr_getschedparam(&ptap2, &spp2) );

spc.sched_priority = prioc;
spp1.sched_priority = priop1;
spp2.sched_priority = priop2;

check( pthread_attr_setschedparam(&ptac, &spc) );
check( pthread_attr_setschedparam(&ptap1, &spp1) );
check( pthread_attr_setschedparam(&ptap2, &spp2) );

Nu de thread attributes zijn "ingesteld" kunnen de threads worden opgestart met behulp van de functie pthread_create:

   check( pthread_create(&ptc, &ptac, consumer, 0) );
check( pthread_create(&ptp1, &ptap1, producer, &frikadel) );
check( pthread_create(&ptp2, &ptap2, producer, &kroket) );

De thread ID's van deze threads zijn van het type pthread_t en worden in de variabelen ptc, ptp1 en ptp2 opgeslagen.

Vervolgens moet de main thread wachten totdat de andere threads geëindigd zijn. Bij het beëindigen van de main thread van een proces worden namelijk ook alle andere threads van het proces gestopt en wordt het gehele proces beëindigd.

   check( pthread_join(ptc, 0) );
check( pthread_join(ptp1, 0) );
check( pthread_join(ptp2, 0) );

Als alle threads beëindigd zijn worden de aangemaakte semaphoren en thread attributes opgeruimd:

   check_errno( sem_destroy(&semMutualExclusive) );
check_errno( sem_destroy(&semEmpty) );
check_errno( sem_destroy(&semFilled) );

check( pthread_attr_destroy(&ptac) );
check( pthread_attr_destroy(&ptap1) );
check( pthread_attr_destroy(&ptap2) );

Dit is in dit geval niet stikt noodzakelijk omdat bij het eindigen van het proces toch alles automatisch wordt opgeruimd.

Het complete voorbeeld programma in C en de bijbehorende makefile kun je vinden in de files opdr2.c en makefile.

Voorbeeld programma in C++.

De buffer uit het voorbeeldprogramma in C is niet erg herbruikbaar. In C++ kun je alle data en functies van de buffer "inkapselen" in de class CircularBuffer. Door een template te gebruiken kunnen we een herbuikbaar buffer definiëren dat met elk willekeurig type gebruikt kan worden. Ook het aantal plaatsen in de buffer kunnen we met behulp van een template parameter instelbaar maken.

template<typename T, int size>
class CircularBuffer {
public:
CircularBuffer(); // constructor zorgt voor de initialisatiie van de datamembers
~CircularBuffer();
void put(const T& t); // schrijf element in de buffer. WACHT als buffer vol is!
const T get(); // lees een element uit de buffer. WACHT als buffer leeg is!
private:
T buffer[size]; // buffer voor het opslaan van size elementen van het type T
int indexGet; // index waar het volgende element uit de buffer gelezen wordt
int indexPut; // index waar het volgende element in de buffer geschreven wordt
sem_t semMutualExclusive; // binaire semaphore: zorgt voor wederzijdse uitsluiting
sem_t semEmpty; // counting semaphore: telt aantal lege plaatsen
sem_t semFilled; // counting semaphore: telt aantal gevulde plaatsen
};

De semaphoren worden nu geïnitialiseerd in de constructor en opgeruimd in de destructor.

Het complete voorbeeld programma in C++ en de bijbehorende makefile kun je vinden in de files opdr2.cpp en makefile.

Programma analyse.

Opdracht 2a.

Compileer het programma opdr2.c of opdr2.cpp met de bijbehorende makefile en start dit programma met de prioriteiten: consumer = 22, frikadellen producer = 21 en kroketten producer = 20.

opdr2.out 22 21 20

Verklaar de uitvoer! Wees nauwkeurig in je verklaring. Verklaar waarom eerst alle frikadellen gebakken worden en pas daarna de kroketten. Verklaar waarom er geen frikadellen geconsumeerd worden nadat de frikadellenbakker gestopt is.

Om het programma te analyseren kun je eventueel gebruik maken van de DDD debugger.

Maak aantekeningen zodat de docent deze opdracht vlot kan aftekenen.

De code van de consumer lijkt inefficient:

In C:

      c = get();
check_errno( sem_wait(&semPrintf) );
check_errno( putchar(c) );
check_errno( sem_post(&semPrintf) );

In C++:

      char c(b.get());
check_errno( sem_wait(&semCout) );
cout << c;
check_errno( sem_post(&semCout) );

Het gebruik van de lokale variabele lijkt overbodig.

In C:

      check_errno( sem_wait(&semPrintf) );
check_errno( putchar(get()) );
check_errno( sem_post(&semPrintf) );

In C++:

      check_errno( sem_wait(&semCout) );
cout << b.get();
check_errno( sem_post(&semCout) );

Opdracht 2b.

Pas de consumer code in het programma opdr2.c of opdr2.cpp aan zoals hierboven besproken. Compileer het programma en voer het uit.

opdr2.out 22 21 20

Verklaar de uitvoer! (Als het "goed" is loopt het programma vast.)

Maak aantekeningen zodat de docent deze opdracht vlot kan aftekenen.

Opdracht 2c.

Compileer het (orginele) programma opdr2.c of opdr2.cpp met de bijbehorende makefile en start dit programma met de prioriteiten: consumer = 20, frikadellen producer = 21 en kroketten producer = 22.

opdr2.out 20 21 22

Verklaar de uitvoer! Wees nauwkeurig in je verklaring. Verklaar waarom eerst alle kroketten gebakken worden en pas daarna de frikadellen. Verklaar waarom er nog kroketten geconsumeerd worden nadat de krokettenbakker gestopt is. Verklaar waarom dit er 17 zijn! Er passen er toch maar 16 in de buffer?

Maak aantekeningen zodat de docent deze opdracht vlot kan aftekenen.

Opdracht 2d.

Probeer nu eens te voorspellen (zonder het programma te runnen) wat er gebeurd bij de prioriteiten: consumer = 20, frikadellen producer = 21 en kroketten producer = 21.

Denk goed na!

Test je voorspelling door het programma uit te voeren.

opdr2.out 20 21 21

Verklaar de uitvoer! Wees nauwkeurig in je verklaring. Verklaar waarom eerst 17 frikadellen gebakken worden. Verklaar waarom het consumeren van frikadellen en kroketten daarna steeds afwisseld. Verklaar waarom er nadat de bakkers gestopt zijn nog 17 kroketten geconsumeerd worden.

Maak aantekeningen zodat de docent deze opdracht vlot kan aftekenen.

Alternatieve implementatie.

Om threads met elkaar te synchroniseren kunnen we in plaats van semaphoren ook mutexen en conditionele variabelen gebruiken. Zie eventueel paragraaf 5.7 in het theorieboek. De relevante IEEE Std 1003.1 functies zijn:

Opdracht 2e.

Vervang in het voorbeeld programma de synchronisatie met semaphoren door synchronisatie met mutexen en conditionele variabelen.

Verder met opdracht 3...