|
Вопрос # 624/ вопрос открыт / |
|
Здравствуйте, уважаемые эксперты!
Здравствуйте :) Зараннее прошу извинить за изложение - я новичок как в программированиии так и в написаниях сообщ. на форумах.
Очередная (наверное :)) головоломка по потокам.
Для выполнения множества динамических долгих по выполнению (сервера удаленные) SQL-запросов, что формируются как колонка в результате другого SQL-запроса, решено было создать пул из одновременно выполняющих запросы потоков. Итак, основной поток организовывал прокрутку запросов, а пул подпотоков выполнял.
Вот основные моменты кода:
(приложение)
Итак, главные объекты
hmMain - хэндл на мютекс для синхронизации данных (в частности, счетчика в MySemaphore) между основным и подчиненными потоками
MySemaphore - для временного замораживания осн. потока в случае перегрузки пула подпотоков.
Где я ошибся? Почему счетчик семафора "переполняется" и вылетает в п. 2 (подписано - есть комментарий)
Приложение: Переключить в обычный режим-
-
- procedure TfrmMain.Run;
- var
- i, RecordCount : integer;
- begin
-
- //
- SetLength(ActiveThreads, seThreadsCount.Value);
- try
- for i := 0 to seThreadsCount.Value-1 do
- begin
- ActiveThreads<i>.thr := nil;
- ActiveThreads<i>.Status:=tsNil;
- end;
-
- if (hmMain<>0)then
- try
-
- try
- sqlqOperators.First;
- i:=0;
- while not sqlqOperators.Eof do
- begin
- CreateQueryThread(i);
- Application.ProcessMessages;
- sqlqOperators.Next;
- inc(i);
- end;
- WaitForAllQueryThreadsDone;
- CheckForQueryResults;
- finally
- MySemaphore.Free;
- end;
- finally
- CloseHandle(hmMain);
- end;
- finally
- Finalize(ActiveThreads);
- end;
- end;
-
-
-
- function TfrmMain.CreateQueryThread(Counter : Integer): Integer;
- var
- nth : TThreadQueryVRNTK;
- i : Integer;
- s:string;
- begin
- AddLog('Main before MySemaphore.CheckBeforeAdd');
-
- AddLog('Main after MySemaphore.CheckBeforeAdd');
- WaitForMainBlock;
- AddLog('Main after WaitForMainBlock');
- try
- if (MySemaphore.AddRef>MySemaphore.MaxCount) then
- begin
- s:='MySemaphore.AddRef>MySemaphore.MaxCount';
- AddLog(s);
-
- end;
- nth := nil;
- Result:=-1;
- for i := 0 to Length(ActiveThreads)-1 do
- begin
- case ActiveThreads<i>.Status of
- tsNil :
- begin
- nth := TThreadQueryVRNTK.Create(True);
- nth.ConnectionString:=Copy(sqlcOperators.Params.Text, 1,
Length(sqlcOperators.Params.Text));
- ActiveThreads<i>.thr:=nth;
- nth.ThreadNumber:=i;
- nth.FreeOnTerminate:=True;
- end;
-
- nth := ActiveThreads<i>.thr;
- tsRunning:
- Continue;
- end;
- nth.SQL := Copy(sqlqOperators.FieldByName('SQL').AsString, 1,
Length(sqlqOperators.FieldByName('SQL').AsString));
- nth.QueryNumber:=Counter;
- ActiveThreads<i>.Status:=tsRunning;
- Result:=i;
- break;
- end;
-
- if (nth=nil) then
- begin
- s:='(';
- for i := 0 to Length(ActiveThreads)-1 do
- s:=s+Format('[PoolID=%d, QueryNumb=%d], ', [i,
ActiveThreads<i>.thr.QueryNumber]);
- s:=s+')';
- s:=Format('Error creating thread for query %d. Nth is nil. Result=%d. ActiveThreads
are:#13#10%s', [Counter+1, Result, s]);
- AddLog(s);
- raise exception.Create(s);
- end
- else
- NewQueryRunning(nth);
- if (i>Length(ActiveThreads)) then
-
- finally
- ReleaseMainBlock(nil);
- end;
-
-
- end;
-
-
-
- function TMySemaphore.AddRef: Integer;
- begin
- inc(FCounter);
- Result := FCounter;
- end;
-
- procedure TMySemaphore.Release;
- begin
- dec(FCounter);
- end;
-
- procedure TMySemaphore.CheckBeforeAdd;
- begin
- if (FCounter>=FMaxCount) then
- begin
- if (WaitForSingleObject(FhEvent, INFINITE)<>WAIT_OBJECT_0) then
- raise Exception.Create('WaitForSingleObject(FhEvent,
INFINITE)<>WAIT_OBJECT_0');
- ResetEvent(FhEvent);
- end;
- end;
-
- procedure TMySemaphore.CheckAfterRelease;
- begin
- if (FCounter>=FMaxCount-1) then
- SetEvent(FhEvent);
- end;
-
- constructor TMySemaphore.Create(Count : Integer);
- begin
- FMaxCount:=Count;
- FCounter:=0;
- FhEvent := CreateEvent(nil, True, False, nil);
- end;
-
-
-
- procedure TThreadQueryVRNTK.Execute;
- begin
- try
- //SetName;
-
-
- //raise(Exception.CreateFmt('Thread=%d', [GetCurrentThreadId]));
- finally
-
- end;
- end;
-
-
-
- procedure TfrmMain.ReleaseQueryThread(athq: TThreadQueryVRNTK);
- begin
- //athq.AddLog(Format('Thread %d before ReleaseQueryThread', [athq.ThreadID]));
- WaitForMainBlock;
- try
- MySemaphore.Release;
-
- finally
- ReleaseMainBlock(athq);
- MySemaphore.CheckAfterRelease;
- end;
- //athq.AddLog(Format('Thread %d after ReleaseQueryThread', [athq.ThreadID]));
- end;
-
-
-
-
- procedure TfrmMain.ReleaseMainBlock(Athr:TThread);
- var
- RelRes:BOOL;
- begin
- RelRes := ReleaseMutex(hmMain);
- if (not RelRes) then
- //if (Athr<>nil) then
- Win32Check(RelRes);
- end;
-
- procedure TfrmMain.WaitForMainBlock;
- var
- WaitRes:integer;
- begin
- //while
- WaitRes:=WaitForSingleObject(hmMain, INFINITE);
- if (WaitRes<>WAIT_OBJECT_0) then //do;
- raise(Exception.CreateFmt('res=%d', [WaitRes]));
- end;
 |
Вопрос задал: Иванов Сергей - 1109 (статус: Посетитель)
Вопрос отправлен: 11 июня 2007, 18:45
Состояние вопроса: открыт, ответов: 1.
|
Ответ #1. Отвечает эксперт: Вадим К
Здравствуйте, Иванов Сергей!
Вы сделали свою реализацию семафора, а она не потокобезопасная. Почему бы не использовать стандартную? А так как она не является безопасной, то она и не работает. Точнее работает, но не всегда.
 |
Ответ отправил: Вадим К (статус: Академик)
Время отправки: 11 июня 2007, 19:24
Оценка за ответ: 2
Комментарий к оценке: Где потоковая опасность в приведенных участках кода??? Ответ нужно разьяснить. Ибо с таким успехом и сам TThread можно считать потокоопасным. Хотя по-моему тут и есть доля истины. Но нужно прочувствовать сценарий.
|
Мини-форум вопроса
Всего сообщений: 5; последнее сообщение — 12 июня 2007, 13:05; участников в обсуждении: 2.
|
Иванов Сергей - 1109 (статус: Посетитель), 11 июня 2007, 22:00 [#1]:
М-да... Похоже, нашел deadlock... Алгоритм, вроде, такой (Main-гл. поток, Sub-подчин.):
1) (Main) AddRef
2) (Sub) CheckAfterRelease -> SetEvent
3) (Main) bla-bla
4) (Main) CheckBeforeAdd ->WaitForSingleObject - а воно в состоянии signaled!!!!!!!
А кто-нить знает как эту байду переписать с минимальным изменением кода?
|
|
Иванов Сергей - 1109 (статус: Посетитель), 12 июня 2007, 10:36 [#2]:
Переставил в 5). Сделал что-то вроде:
try
MySemaphore.CheckAfterRelease;
finally
ReleaseMainBlock(athq);
end;
- не помогло тоже. Алгоритм завала подобен прошлому. А именно:
1) (Sub) CheckAfterRelease : FCount=FMax-1 => SetEvent
2) (Main) Создать новый поток CreateQueryThread => FCount=FMax
3) (Main) Ещё раз создать новый поток CreateQueryThread - пройдет, поскольку Event в состоянии signalled.
Есть, дальше, два варианта
1) ResetEvent из семафора перенести в защищенную часть кода (в AddRef по условию, скажем).
2) Денйствительно, переписать всё на системный семафор. Но мне кажется, что можно и без него обойтись (поскольку последний решает чуть другие проблемы - и гемморой опять обеспечен)
|
|
Иванов Сергей - 1109 (статус: Посетитель), 12 июня 2007, 10:42 [#3]:
Ну что же. В варианте
function TMySemaphore.AddRef: Integer;
begin
inc(FCounter);
if (FCounter=FMaxCount) then
ResetEvent(FhEvent);
Result := FCounter;
end;
всё работает! Интуитивно ясно, но не уверен...
|
|
Вадим К (статус: Академик), 12 июня 2007, 11:14 [#4]:
Новичок, а такой грубый.
>>Где потоковая опасность в приведенных участках кода???
Я чётко сказал - в реализации семафора.
>>Ответ нужно разьяснить
Ну я решил, что после такого вопроса (сложноватого достаточно. Новичок такое не сделает) мой ответ будет понятен.
Сама небезопасность находиться в следующих строках
---
if (FCounter>=FMaxCount-1) then
SetEvent(FhEvent);
--
if (FCounter>=FMaxCount) then
begin
if (WaitForSingleObject(FhEvent, INFINITE)<>WAIT_OBJECT_0) then
raise Exception.Create('WaitForSingleObject(FhEvent, INFINITE)<>WAIT_OBJECT_0');
ResetEvent(FhEvent);
Если бы внимательно почитали Рихтера, то знали бы, что Windows имеет полное право прервать ваш поток в любом месте. Тоесть функции и процедуры не атомарны. Поэтому в случае с
if (FCounter>=FMaxCount-1) then
SetEvent(FhEvent);
Условие может провериться и оно пускай выполняется, а в этот момент другой поток прервал и тоже проверил условие. оно естественно тоже выполнилось и установил событие. теперь выполнился третий поток и изменил FCounter так, что условие уже не выполняется. А теперь первый поток снова получает управление. и тут засада. Условие уже не выполняется, а событие устанавливается. Подумайте над этим
Галочка "подтверждения прочтения" - вселенское зло.
|
|
Иванов Сергей - 1109 (статус: Посетитель), 12 июня 2007, 13:05 [#5]:
Вадим К, прошу извинить. Но тем не менее я просто рассчитывал именно на сценарий. Вроде как разобрался. Спасибо.
>>
if (FCounter>=FMaxCount-1) then
SetEvent(FhEvent);
стала атомарной в варианте № 2 (села под общий мютекс). А вот с ResetEvent я ещё долго возился.
Самое интересное, что , вроде, для подобной организации можно обойтись таки одним объектом синхронизации, но до этого я ещё не "дожил" . Ещё раз спасибо.
|
Чтобы оставлять сообщения в мини-форумах, Вы должны авторизироваться на сайте.
|