5b. Конкуренция: взаимно изключване и синхронизация

5.4 Семафори
   Основен принцип: два или повече процеса се кооперират в смисъл на прости сигнали така, че един процес може да бъде спрян на определено място и да остане в това състояние, докато не получи съответен сигнал.
   За да изпрати сигнал чрез семафора  s, процесът трябва да изпълни примитива (процедурата) signal(s). За да получи сигнал от семафор s, процесът трябва да изпълни wait(s)  и ако съответният сигнал още не е изпратен, да остане в състояние на чакане (блокиран).
Дефинирани са 3 действия с променливата семафор:
   1. Инициализира се с неотрицателно число;
   2. Примитива wait(s) намалява стойността на s с 1. Ако стойността стане отрицателна, процесът (изпълняващ wait) се блокира.
   3. Операцията signal(s) увеличава стойността на s с 1. Ако стойността не е положителна, процесът (блокиран с wait) се разблокира.
    Примитивите wait и signal се предполат неделими, т.е. те те не могат да бъдат прекъсвани.
struct semaphore {
 int count;
 queueType queue;
};

void wait(semaphore s)
{
 s.count--;
 if (s.count < 0)
 {
  place this process in s.queue;
  block this process;
 }
}
void signal(semaphore s)
{
 s.count++;
 if (s.count <= 0)
 {
  remove a process P from s.queue;
  place process P on ready list;
 }
}
Двоични семафори:
struct binary_semaphore {
enum (zero, one) value;
queueType queue;
};
void waitB(binary_semaphore s)
{
 if (s.value==one) s.value=zero;
 else
 {
  place this process in s.queue;
  block this process;
 }
}
void signalB(binary_semaphore s)
{
 if (s.queue.is_empty()) s.value=one;
 else
 {
  remove a process P from s.queue;
  place process P on ready list;
 }
}
** Взаимно изключване - решение на задачата за ВИ със семафори:
/* program mutualexclusion */
const int n = /* number of processes */;
semaphore s = 1;
void P(int i)
{
 while (true)
 {
  wait(s);
  /* critical section */
  signal(s);
  /* remainder */
 }
}
void main()
{
 parbegin(P(1), P(2),..., P(n));
}
    Стойността на променливата s.count има следния смисъл:
- s.count >= 0 е броят на процесите, които могат да изпълнят примитива wait без да бъдат блокирани (и няма изпълнение на signal);
- s.count < 0 е броят на процесите, чакащи в s.queue.

** Задачата за производител/потребител.
    Един или няколко производители генерират данни (записи, символи) и ги поставят в един буфер. Един потребител взема по една данна от буфера. Само един агент (производител или потребител) има достъп да буфера в даден момент.
* Първи опит за решение със семафори:
/* program producerconsumer */
int n;
binary_semaphore s = 1;
binary_semaphore delay = 0;

void producer()
{
 while (true)
 {
  produce();
  waitB(s);
  append();  n++;
  if (n==1) signalB(delay);
  signalB(s);
 }
}
void consumer()
{
 waitB(delay);
 while (true)
 {
  waitB(s);
  take();  n--;
  signalB(s);
  consume();
  if (n==0) waitB(delay);
 }
}
void main()
{ n = 0;
  parbegin(producer, consumer);
}
    Семафорът s служи за взаимно изключване, семафорът delay - за чакане на потребителя когато буферът е празен. Проблем:
     
n
delay
1
Init
 
0
0
2
Producer
КС
1
1
3
Consumer
waitB(delay)
1
0
4
Consumer
КС
0
0
5
Producer
КС
1
1
6
Consumer
if (n==0) waitB(delay)
1
1
7
Consumer
КС
0
1
8
Consumer
if (n==0) waitB(delay)
0
0
9
Consumer
КС
-1
0

* Втори опит - решение:
int n;
binary_semaphore s = 1;
binary_semaphore delay = 0;

void producer()
{
 while (true)
 {
  produce();
  waitB(s);
  append();n++;
  if (n==1) signalB(delay);
  signalB(s);
 }
}
void consumer()
{ int m;  /* a local variable */
  waitB(delay);
  while (true)
  {
   waitB(s);
   take(); n--;  m = n;
   signalB(s);
   consume();
   if (m==0) waitB(delay);
  }
}
void main()
{ n = 0;
  parbegin (producer, consumer);
}
--- FIGURE 5.16 ---
Варианти ва задачата - безкраен буфер и краен кръгов буфер.

** Задачата на бръснаря
--- FIGURE 5.19 ---



5.5 Монитори.
** Монитор със сигнал.
    Мониторите са програмни модули, състоящи се от една или повече процедури, инициализация и локални данни. Главни характеристики:
   1. Локалните данни са достъпни само от процедурите на монитора.
   2. Един процес "влиза в монитора" като извика някоя от процедурите му.
   3. Само един процес може да се изпълнява в монитора, всеки друг процес чака да се "освободи" монитора.
    Променливи на условия (condition variables). Функции с тези променливи:
- cwait(c):  Прекратяване изпълнението на процеса с условие c.  Мониторът е достъпен за друг процес;
- csignal(c): Въобновяване на изпълнението на процеса, прекратен с cwait със същото условие. Ако има няколко такива процеса се избира един от тях, ако няма - нищо не се прави.
    Разлика със семафорите - сигналът се "загубва", ако няма чакащи процеси за този сигнал.
--- FIGURE 5.22 ---
/* program producerconsumer */
monitor boundedbuffer;
char buffer[N];         /* space for N items */
int nextin, nextout;    /* buffer pointers */
int count;              /* number of items in buffer */
int notfull, notempty;  /* for synchronization */
void append (char x)
{
 if (count == N)
 cwait(notfull); /* buffer is оull;*/
                 /* avoid overflow */
 buffer[nextin] = x;
 nextin = (nextin + 1) % N;
 count++; /* one more item in buffer */
 csignal(notempty); 
/* resume any  waiting consumer */
}
void take (char x)
{
 if (count == 0)
 cwait(notempty); /* buffer is empty;*/
                  /* avoid underflow */
 x = buffer[nextout];
 nextout = (nextout + 1) % N;
 count--; /* one fewer item in buffer */
 csignal(notfull); 
/* resume any  waiting producer */
}
{
 /* monitor body */
 nextin = 0; nextout = 0; count = 0; /* buffer initially empty */
}
void producer()
{ char x;
  while (true)
  {
   produce(x);
   append(x);
  }
}
void consumer()
{ char x;
  while (true)
  {
   take(x);
   consume(x);
  }
}
void main()
{ parbegin(producer, consumer);
}
** Монитори със съобщения и разпространение.


5.6 Предаване на съобщения.
    Синхронизация и комуникация са двата проблема на взаимодействащи си процеси - взаимно изключване и обмен на информация. Двете задачи се решават с обмен на съобщения. Дефинират се два примитива (процедури):
send(destination, message)
receive(source, message)
Съществуват различини възможности за реализация на тези 2 процедури:
 
Synchronization Addressing  Format
Send Direct Content
    blocking     send Length 
    nonblocking     receive     fixed
Receive         explicit     variable
    blocking         implicit
    nonblocking Indirect
    test for arrival     static Queuing Discipline
    dynamic FIFO
    ownership Priority
** Синхронизация.
    Send -  процесът се блокира, докато събощението бъде получено (blocking send) или процесът не се блокира (nonblocking send).
    Receive - ако съобщението е изпратено, то се получава и процесът продължава. Ако не е изпратено: процесът се блокира до изпращане и получавене на съобщението (blocking receive) или процесът продължава, изоставяйки опитите за получаването му (nonblocking receive).
    Възможни са 3 комбинации :
-- Blocking send, blocking receive;
-- Nonblocking send, blocking receive - най-често срещано, при грешка, могат де се генерират много съобщения;
-- Nonblocking send, nonblocking receive.
** Адресиране.
--- FIGURE 5.24 ---
** Формат на съобщенията.
--- FIGURE 5.25 ---
** Дисциплина на опашките.


5.7 Задачата на писатели/читатели
    Данни (файл, блок от паметта) се използват от няколко процеса. Някои от процесите само четат тези данни - читетили, а други само променят данните - писатели. Следните условия трябва де се спазват:
   1. Произволен брой читатели могат да четат едновременно данните;
   2. Само един писател може да пише данни;
   3. Ако писател пише, никой читател не може да чете.
Ако всеки процес може и да пише и да чете, общото решение за ВИ работи.
Ако един процес може или само да чете, или само да пише - общото решение е неприемливо, а съществуват и много по-ефективни решения.
** Предимство на читателите.
/* program readersandwriters */
int readcount;
semaphore x = 1, wsem = 1;
void reader()
{ while (true)
  {
   wait(x);
   readcount++;
   if (readcount == 1) wait(wsem);
   signal(x);
   READUNIT();
   wait(x);
   readcount--;
   if (readcount == 0) signal(wsem);
   signal(x);
  }
}
void writer()
{
 while (true)
 {
  wait(wsem);
  WRITEUNIT();
  signal(wsem);
 }
}
 
 

 

void main()
{ readcount = 0;
  parbegin(reader, writer);
}
** Предимство на писателите.
/* program readersandwriters */
int readcount, writecount;
semaphore x = 1, y = 1, z = 1, wsem = 1, rsem = 1;
 
void reader()
{ while (true)
  {
   wait(z);
   wait(rsem);
   wait(x);
   readcount++;
   if (readcount == 1) wait(wsem);
   signal(x);
   signal(rsem);
   signal(z);
   READUNIT();
   wait(x);
   readcount--;
   if (readcount == 0) signal(wsem);
   signal(x);
  }
}
void writer ()
{
 while (true)
 {
  wait(y);
  writecount++;
  if (writecount == 1) wait(rsem);
  signal(y);
  wait(wsem);
  WRITEUNIT();
  signal(wsem);
  wait(y);
  writecount--;
  if (writecount == 0) signal(rsem);
  signal(y);
 }
}
void main()
{
 readcount = writecount = 0;
 parbegin (reader, writer);
}
    Предназначение на семафорите:
- x - коректната работа с readcount;
- y - коректната работа с writecount;
- z -
- rsem - свободно за четене (няма писател);
- wsem - свободно за писане (няма читатели).

*  Решение на задачата с размяна на съобщения.

void readri()
{
 message: rmsg;
 while (true)
 {
  rmsg = i;
  send(readrequest, rmsg);
  receive(mboxi, rmsg);
  READUNIT;
  rmsg = i;
  send(finished, rmsg);
 }
}
void writerj()
{
 message: rmsg;
 while (true)
 {
  rmsg = i;
  send(writerequest, rmsg);
  receive(mboxj, rmsg);
  WRITEUNIT;
  rmsg = i;
  send(finished, rmsg);
 }
}

void controller()
{ while(true)
  { if (count > 0)
    { if (not empty(finished))
      { receive(finished, msg);
        count++;
      }
      else if (not empty(writerequest))
      { receive(writerequest, msg);
        writer.id = msg.id;
        count -= 100;
      }
      else if (not empty(readrequest))
      { receive(readrequest, msg);
        count--;
        send(msg.id, "OK");
      }
    }
    if (count==0)
    { send(writer.id, "OK");
      receive(finished, msg);
      count = 100
    }
    while (count<0)
    { receive(finished, msg);
      count++;
    }
  }
}