| 
| 
 | Вопрос # 624/ вопрос открыт / | 
 |  Здравствуйте, уважаемые эксперты!Здравствуйте :) Зараннее прошу извинить за изложение - я новичок как в программированиии так и в написаниях сообщ. на форумах.
 Очередная (наверное :)) головоломка по потокам.
 Для выполнения множества динамических долгих по выполнению (сервера удаленные) SQL-запросов, что формируются как колонка в результате другого SQL-запроса, решено было создать пул из одновременно выполняющих запросы потоков. Итак, основной поток организовывал прокрутку запросов, а пул подпотоков выполнял.
 
 Вот основные моменты кода:
 
 (приложение)
 
 Итак, главные объекты
 hmMain - хэндл на мютекс для синхронизации данных (в частности, счетчика в MySemaphore) между основным и подчиненными потоками
 MySemaphore - для временного замораживания осн. потока в случае перегрузки пула подпотоков.
 
 Где я ошибся? Почему счетчик семафора "переполняется" и вылетает в п. 2 (подписано - есть комментарий)
 Приложение:Переключить в обычный режим   procedure TfrmMain.Run;var  i, RecordCount : integer;begin   //  SetLength(ActiveThreads, seThreadsCount.Value);  try    for i := 0 to seThreadsCount.Value-1 do    begin      ActiveThreads<i>.thr := nil;      ActiveThreads<i>.Status:=tsNil;    end;     if (hmMain<>0)then      try         try          sqlqOperators.First;          i:=0;          while not sqlqOperators.Eof do          begin            CreateQueryThread(i);            Application.ProcessMessages;            sqlqOperators.Next;            inc(i);          end;          WaitForAllQueryThreadsDone;           CheckForQueryResults;        finally          MySemaphore.Free;        end;      finally        CloseHandle(hmMain);      end;  finally     Finalize(ActiveThreads);  end;end;   function TfrmMain.CreateQueryThread(Counter : Integer): Integer;var  nth : TThreadQueryVRNTK;  i : Integer;  s:string;begin  AddLog('Main before MySemaphore.CheckBeforeAdd');   AddLog('Main after MySemaphore.CheckBeforeAdd');  WaitForMainBlock;  AddLog('Main after WaitForMainBlock');  try    if (MySemaphore.AddRef>MySemaphore.MaxCount) then    begin      s:='MySemaphore.AddRef>MySemaphore.MaxCount';      AddLog(s);     end;    nth := nil;    Result:=-1;    for i := 0 to Length(ActiveThreads)-1 do    begin      case ActiveThreads<i>.Status of        tsNil :        begin          nth := TThreadQueryVRNTK.Create(True);          nth.ConnectionString:=Copy(sqlcOperators.Params.Text, 1,
Length(sqlcOperators.Params.Text));          ActiveThreads<i>.thr:=nth;          nth.ThreadNumber:=i;          nth.FreeOnTerminate:=True;        end;           nth := ActiveThreads<i>.thr;        tsRunning:          Continue;      end;      nth.SQL := Copy(sqlqOperators.FieldByName('SQL').AsString, 1,
Length(sqlqOperators.FieldByName('SQL').AsString));      nth.QueryNumber:=Counter;      ActiveThreads<i>.Status:=tsRunning;      Result:=i;      break;    end;     if (nth=nil) then      begin        s:='(';        for i := 0 to Length(ActiveThreads)-1 do          s:=s+Format('[PoolID=%d, QueryNumb=%d], ', [i,
ActiveThreads<i>.thr.QueryNumber]);        s:=s+')';        s:=Format('Error creating thread for query %d. Nth is nil. Result=%d. ActiveThreads
are:#13#10%s', [Counter+1, Result, s]);        AddLog(s);        raise exception.Create(s);      end    else      NewQueryRunning(nth);    if (i>Length(ActiveThreads)) then   finally    ReleaseMainBlock(nil);  end;  end;   function TMySemaphore.AddRef: Integer;begin  inc(FCounter);  Result := FCounter;end; procedure TMySemaphore.Release;begin  dec(FCounter);end; procedure TMySemaphore.CheckBeforeAdd;begin  if (FCounter>=FMaxCount) then  begin    if (WaitForSingleObject(FhEvent, INFINITE)<>WAIT_OBJECT_0) then      raise Exception.Create('WaitForSingleObject(FhEvent,
INFINITE)<>WAIT_OBJECT_0');    ResetEvent(FhEvent);  end;end; procedure TMySemaphore.CheckAfterRelease;begin  if (FCounter>=FMaxCount-1) then    SetEvent(FhEvent);end; constructor TMySemaphore.Create(Count : Integer);begin  FMaxCount:=Count;  FCounter:=0;  FhEvent := CreateEvent(nil, True, False, nil);end;   procedure TThreadQueryVRNTK.Execute;begin  try    //SetName;      //raise(Exception.CreateFmt('Thread=%d', [GetCurrentThreadId]));  finally   end;end;   procedure TfrmMain.ReleaseQueryThread(athq: TThreadQueryVRNTK);begin  //athq.AddLog(Format('Thread %d before ReleaseQueryThread', [athq.ThreadID]));  WaitForMainBlock;  try    MySemaphore.Release;   finally    ReleaseMainBlock(athq);    MySemaphore.CheckAfterRelease;   end;  //athq.AddLog(Format('Thread %d after ReleaseQueryThread', [athq.ThreadID]));end;    procedure TfrmMain.ReleaseMainBlock(Athr:TThread);var  RelRes:BOOL;begin  RelRes := ReleaseMutex(hmMain);  if (not RelRes) then    //if (Athr<>nil) then      Win32Check(RelRes);end; procedure TfrmMain.WaitForMainBlock;var WaitRes:integer;begin  //while  WaitRes:=WaitForSingleObject(hmMain, INFINITE);  if (WaitRes<>WAIT_OBJECT_0) then //do;    raise(Exception.CreateFmt('res=%d', [WaitRes]));end;
|  |   Вопрос задал: Иванов Сергей - 1109 (статус: Посетитель)Вопрос отправлен: 11 июня 2007, 18:45
 Состояние вопроса: открыт, ответов: 1.
 |  Ответ #1. Отвечает эксперт: Вадим К Здравствуйте, Иванов Сергей!Вы сделали свою реализацию семафора, а она не потокобезопасная. Почему бы не использовать стандартную? А так как она не является безопасной, то она и не работает. Точнее работает, но не всегда.
 
|  | Ответ отправил: Вадим К (статус: Академик)Время отправки: 11 июня 2007, 19:24
 Оценка за ответ: 2
 Комментарий к оценке: Где потоковая опасность в приведенных участках кода??? Ответ нужно разьяснить. Ибо с таким успехом и сам TThread можно считать потокоопасным. Хотя по-моему тут и есть доля истины. Но нужно прочувствовать сценарий. |  
 Мини-форум вопросаВсего сообщений: 5; последнее сообщение — 12 июня 2007, 13:05; участников в обсуждении: 2. 
|   | Иванов Сергей - 1109 (статус: Посетитель), 11 июня 2007, 22:00 [#1]:М-да... Похоже, нашел deadlock... Алгоритм, вроде, такой (Main-гл. поток, Sub-подчин.): 1) (Main) AddRef
 2) (Sub) CheckAfterRelease -> SetEvent
 3) (Main) bla-bla
 4) (Main) CheckBeforeAdd ->WaitForSingleObject - а воно в состоянии signaled!!!!!!!
 А кто-нить знает как эту байду переписать с минимальным изменением кода?
 |  
|   | Иванов Сергей - 1109 (статус: Посетитель), 12 июня 2007, 10:36 [#2]:Переставил в 5). Сделал что-то вроде: try
 MySemaphore.CheckAfterRelease;
 finally
 ReleaseMainBlock(athq);
 end;
 - не помогло тоже. Алгоритм завала подобен прошлому. А именно:
 1)  (Sub) CheckAfterRelease : FCount=FMax-1 => SetEvent
 2) (Main) Создать новый поток CreateQueryThread => FCount=FMax
 3) (Main) Ещё раз создать новый поток CreateQueryThread - пройдет, поскольку Event в состоянии signalled.
 Есть, дальше, два варианта
 1) ResetEvent из семафора перенести в защищенную часть кода (в AddRef по условию, скажем).
 2) Денйствительно, переписать всё на системный семафор. Но мне кажется, что можно и без него обойтись (поскольку последний решает чуть другие проблемы - и гемморой опять обеспечен)
 |  
|   | Иванов Сергей - 1109 (статус: Посетитель), 12 июня 2007, 10:42 [#3]:Ну что же. В варианте function TMySemaphore.AddRef: Integer;
 begin
 inc(FCounter);
 if (FCounter=FMaxCount) then
 ResetEvent(FhEvent);
 Result := FCounter;
 end;
 всё работает! Интуитивно ясно, но не уверен...
 |  
|   | Вадим К (статус: Академик), 12 июня 2007, 11:14 [#4]:Новичок, а такой грубый. >>Где потоковая опасность в приведенных участках кода???
 Я чётко сказал - в реализации семафора.
 >>Ответ нужно разьяснить
 Ну я решил, что после такого вопроса (сложноватого достаточно. Новичок такое не сделает) мой ответ будет понятен.
 
 Сама небезопасность находиться в следующих строках
 ---
 if (FCounter>=FMaxCount-1) then
 SetEvent(FhEvent);
 --
 if (FCounter>=FMaxCount) then
 begin
 if (WaitForSingleObject(FhEvent, INFINITE)<>WAIT_OBJECT_0) then
 raise Exception.Create('WaitForSingleObject(FhEvent, INFINITE)<>WAIT_OBJECT_0');
 ResetEvent(FhEvent);
 
 Если бы внимательно почитали Рихтера, то знали бы, что Windows имеет полное право прервать ваш поток в любом месте. Тоесть функции и процедуры не атомарны. Поэтому в случае с
 if (FCounter>=FMaxCount-1) then
 SetEvent(FhEvent);
 Условие может провериться и оно пускай выполняется, а в этот момент другой поток прервал и тоже проверил условие. оно естественно тоже выполнилось и установил событие. теперь выполнился третий поток и изменил FCounter так, что условие уже не выполняется. А теперь первый поток снова получает управление. и тут засада. Условие уже не выполняется, а событие устанавливается. Подумайте над этим
 Галочка "подтверждения прочтения" - вселенское зло. |  
|   | Иванов Сергей - 1109 (статус: Посетитель), 12 июня 2007, 13:05 [#5]:Вадим К, прошу извинить. Но тем не менее я просто рассчитывал именно на сценарий. Вроде как разобрался. Спасибо. >>
 if (FCounter>=FMaxCount-1) then
 SetEvent(FhEvent);
 стала атомарной в варианте № 2 (села под общий мютекс). А вот с ResetEvent я ещё долго возился.
 Самое интересное, что , вроде, для подобной организации можно обойтись таки одним объектом синхронизации, но до этого я ещё не "дожил"
  . Ещё раз спасибо. |  Чтобы оставлять сообщения в мини-форумах, Вы должны авторизироваться на сайте. |