Экспертная система Delphi.int.ru

Сообщество программистов
Общение, помощь, обмен опытом

Логин:
Пароль:
Регистрация | Забыли пароль?

Delphi.int.ru Expert

Другие разделы портала

Переход к вопросу:

#   

Статистика за сегодня:  


Лучшие эксперты

Подробнее »



Вопрос # 624

Раздел: Delphi » Прочее
/ вопрос открыт /

Здравствуйте, уважаемые эксперты!
Здравствуйте :) Зараннее прошу извинить за изложение - я новичок как в программированиии так и в написаниях сообщ. на форумах.
Очередная (наверное :)) головоломка по потокам.
Для выполнения множества динамических долгих по выполнению (сервера удаленные) SQL-запросов, что формируются как колонка в результате другого SQL-запроса, решено было создать пул из одновременно выполняющих запросы потоков. Итак, основной поток организовывал прокрутку запросов, а пул подпотоков выполнял.

Вот основные моменты кода:

(приложение)

Итак, главные объекты
hmMain - хэндл на мютекс для синхронизации данных (в частности, счетчика в MySemaphore) между основным и подчиненными потоками
MySemaphore - для временного замораживания осн. потока в случае перегрузки пула подпотоков.

Где я ошибся? Почему счетчик семафора "переполняется" и вылетает в п. 2 (подписано - есть комментарий)

Приложение:
  1.  
  2.  
  3. procedure TfrmMain.Run;
  4. var
  5. i, RecordCount : integer;
  6. begin
  7.  
  8. //
  9. SetLength(ActiveThreads, seThreadsCount.Value);
  10. try
  11. for i := 0 to seThreadsCount.Value-1 do
  12. begin
  13. ActiveThreads<i>.thr := nil;
  14. ActiveThreads<i>.Status:=tsNil;
  15. end;
  16.  
  17. if (hmMain&lt;&gt;0)then
  18. try
  19.  
  20. try
  21. sqlqOperators.First;
  22. i:=0;
  23. while not sqlqOperators.Eof do
  24. begin
  25. CreateQueryThread(i);
  26. Application.ProcessMessages;
  27. sqlqOperators.Next;
  28. inc(i);
  29. end;
  30. WaitForAllQueryThreadsDone;
  31. CheckForQueryResults;
  32. finally
  33. MySemaphore.Free;
  34. end;
  35. finally
  36. CloseHandle(hmMain);
  37. end;
  38. finally
  39. Finalize(ActiveThreads);
  40. end;
  41. end;
  42.  
  43.  
  44.  
  45. function TfrmMain.CreateQueryThread(Counter : Integer): Integer;
  46. var
  47. nth : TThreadQueryVRNTK;
  48. i : Integer;
  49. s:string;
  50. begin
  51. AddLog(&#039;Main before MySemaphore.CheckBeforeAdd&#039;);
  52.  
  53. AddLog(&#039;Main after MySemaphore.CheckBeforeAdd&#039;);
  54. WaitForMainBlock;
  55. AddLog(&#039;Main after WaitForMainBlock&#039;);
  56. try
  57. if (MySemaphore.AddRef&gt;MySemaphore.MaxCount) then
  58. begin
  59. s:=&#039;MySemaphore.AddRef&gt;MySemaphore.MaxCount&#039;;
  60. AddLog(s);
  61.  
  62. end;
  63. nth := nil;
  64. Result:=-1;
  65. for i := 0 to Length(ActiveThreads)-1 do
  66. begin
  67. case ActiveThreads<i>.Status of
  68. tsNil :
  69. begin
  70. nth := TThreadQueryVRNTK.Create(True);
  71. nth.ConnectionString:=Copy(sqlcOperators.Params.Text, 1, Length(sqlcOperators.Params.Text));
  72. ActiveThreads<i>.thr:=nth;
  73. nth.ThreadNumber:=i;
  74. nth.FreeOnTerminate:=True;
  75. end;
  76.  
  77. nth := ActiveThreads<i>.thr;
  78. tsRunning:
  79. Continue;
  80. end;
  81. nth.SQL := Copy(sqlqOperators.FieldByName(&#039;SQL&#039;).AsString, 1, Length(sqlqOperators.FieldByName(&#039;SQL&#039;).AsString));
  82. nth.QueryNumber:=Counter;
  83. ActiveThreads<i>.Status:=tsRunning;
  84. Result:=i;
  85. break;
  86. end;
  87.  
  88. if (nth=nil) then
  89. begin
  90. s:=&#039;(&#039;;
  91. for i := 0 to Length(ActiveThreads)-1 do
  92. s:=s+Format(&#039;[PoolID=%d, QueryNumb=%d], &#039;, [i, ActiveThreads<i>.thr.QueryNumber]);
  93. s:=s+&#039;)&#039;;
  94. s:=Format(&#039;Error creating thread for query %d. Nth is nil. Result=%d. ActiveThreads are:#13#10%s&#039;, [Counter+1, Result, s]);
  95. AddLog(s);
  96. raise exception.Create(s);
  97. end
  98. else
  99. NewQueryRunning(nth);
  100. if (i&gt;Length(ActiveThreads)) then
  101.  
  102. finally
  103. ReleaseMainBlock(nil);
  104. end;
  105.  
  106.  
  107. end;
  108.  
  109.  
  110.  
  111. function TMySemaphore.AddRef: Integer;
  112. begin
  113. inc(FCounter);
  114. Result := FCounter;
  115. end;
  116.  
  117. procedure TMySemaphore.Release;
  118. begin
  119. dec(FCounter);
  120. end;
  121.  
  122. procedure TMySemaphore.CheckBeforeAdd;
  123. begin
  124. if (FCounter&gt;=FMaxCount) then
  125. begin
  126. if (WaitForSingleObject(FhEvent, INFINITE)&lt;&gt;WAIT_OBJECT_0) then
  127. raise Exception.Create(&#039;WaitForSingleObject(FhEvent, INFINITE)&lt;&gt;WAIT_OBJECT_0&#039;);
  128. ResetEvent(FhEvent);
  129. end;
  130. end;
  131.  
  132. procedure TMySemaphore.CheckAfterRelease;
  133. begin
  134. if (FCounter&gt;=FMaxCount-1) then
  135. SetEvent(FhEvent);
  136. end;
  137.  
  138. constructor TMySemaphore.Create(Count : Integer);
  139. begin
  140. FMaxCount:=Count;
  141. FCounter:=0;
  142. FhEvent := CreateEvent(nil, True, False, nil);
  143. end;
  144.  
  145.  
  146.  
  147. procedure TThreadQueryVRNTK.Execute;
  148. begin
  149. try
  150. //SetName;
  151.  
  152.  
  153. //raise(Exception.CreateFmt(&#039;Thread=%d&#039;, [GetCurrentThreadId]));
  154. finally
  155.  
  156. end;
  157. end;
  158.  
  159.  
  160.  
  161. procedure TfrmMain.ReleaseQueryThread(athq: TThreadQueryVRNTK);
  162. begin
  163. //athq.AddLog(Format(&#039;Thread %d before ReleaseQueryThread&#039;, [athq.ThreadID]));
  164. WaitForMainBlock;
  165. try
  166. MySemaphore.Release;
  167.  
  168. finally
  169. ReleaseMainBlock(athq);
  170. MySemaphore.CheckAfterRelease;
  171. end;
  172. //athq.AddLog(Format(&#039;Thread %d after ReleaseQueryThread&#039;, [athq.ThreadID]));
  173. end;
  174.  
  175.  
  176.  
  177.  
  178. procedure TfrmMain.ReleaseMainBlock(Athr:TThread);
  179. var
  180. RelRes:BOOL;
  181. begin
  182. RelRes := ReleaseMutex(hmMain);
  183. if (not RelRes) then
  184. //if (Athr&lt;&gt;nil) then
  185. Win32Check(RelRes);
  186. end;
  187.  
  188. procedure TfrmMain.WaitForMainBlock;
  189. var
  190. WaitRes:integer;
  191. begin
  192. //while
  193. WaitRes:=WaitForSingleObject(hmMain, INFINITE);
  194. if (WaitRes&lt;&gt;WAIT_OBJECT_0) then //do;
  195. raise(Exception.CreateFmt(&#039;res=%d&#039;, [WaitRes]));
  196. end;


Иванов Сергей - 1109 Вопрос ожидает решения (принимаются ответы, доступен мини-форум)

Вопрос задал: Иванов Сергей - 1109 (статус: Посетитель)
Вопрос отправлен: 11 июня 2007, 18:45
Состояние вопроса: открыт, ответов: 1.

Ответ #1. Отвечает эксперт: Вадим К

Здравствуйте, Иванов Сергей!
Вы сделали свою реализацию семафора, а она не потокобезопасная. Почему бы не использовать стандартную? А так как она не является безопасной, то она и не работает. Точнее работает, но не всегда.

Ответ отправил: Вадим К (статус: Академик)
Время отправки: 11 июня 2007, 19:24
Оценка за ответ: 2

Комментарий к оценке: Где потоковая опасность в приведенных участках кода??? Ответ нужно разьяснить. Ибо с таким успехом и сам TThread можно считать потокоопасным. Хотя по-моему тут и есть доля истины. Но нужно прочувствовать сценарий.

Мини-форум вопроса

Всего сообщений: 5; последнее сообщение — 12 июня 2007, 13:05; участников в обсуждении: 2.
Иванов Сергей - 1109

Иванов Сергей - 1109 (статус: Посетитель), 11 июня 2007, 22:00 [#1]:

М-да... Похоже, нашел deadlock... Алгоритм, вроде, такой (Main-гл. поток, Sub-подчин.):
1) (Main) AddRef
2) (Sub) CheckAfterRelease -> SetEvent
3) (Main) bla-bla
4) (Main) CheckBeforeAdd ->WaitForSingleObject - а воно в состоянии signaled!!!!!!!
А кто-нить знает как эту байду переписать с минимальным изменением кода?
Иванов Сергей - 1109

Иванов Сергей - 1109 (статус: Посетитель), 12 июня 2007, 10:36 [#2]:

Переставил в 5). Сделал что-то вроде:
try
MySemaphore.CheckAfterRelease;
finally
ReleaseMainBlock(athq);
end;
- не помогло тоже. Алгоритм завала подобен прошлому. А именно:
1) (Sub) CheckAfterRelease : FCount=FMax-1 => SetEvent
2) (Main) Создать новый поток CreateQueryThread => FCount=FMax
3) (Main) Ещё раз создать новый поток CreateQueryThread - пройдет, поскольку Event в состоянии signalled.
Есть, дальше, два варианта
1) ResetEvent из семафора перенести в защищенную часть кода (в AddRef по условию, скажем).
2) Денйствительно, переписать всё на системный семафор. Но мне кажется, что можно и без него обойтись (поскольку последний решает чуть другие проблемы - и гемморой опять обеспечен)
Иванов Сергей - 1109

Иванов Сергей - 1109 (статус: Посетитель), 12 июня 2007, 10:42 [#3]:

Ну что же. В варианте
function TMySemaphore.AddRef: Integer;
begin
inc(FCounter);
if (FCounter=FMaxCount) then
ResetEvent(FhEvent);
Result := FCounter;
end;
всё работает! Интуитивно ясно, но не уверен...
Вадим К

Вадим К (статус: Академик), 12 июня 2007, 11:14 [#4]:

Новичок, а такой грубый.
>>Где потоковая опасность в приведенных участках кода???
Я чётко сказал - в реализации семафора.
>>Ответ нужно разьяснить
Ну я решил, что после такого вопроса (сложноватого достаточно. Новичок такое не сделает) мой ответ будет понятен.

Сама небезопасность находиться в следующих строках
---
if (FCounter>=FMaxCount-1) then
SetEvent(FhEvent);
--
if (FCounter>=FMaxCount) then
begin
if (WaitForSingleObject(FhEvent, INFINITE)<>WAIT_OBJECT_0) then
raise Exception.Create('WaitForSingleObject(FhEvent, INFINITE)<>WAIT_OBJECT_0');
ResetEvent(FhEvent);

Если бы внимательно почитали Рихтера, то знали бы, что Windows имеет полное право прервать ваш поток в любом месте. Тоесть функции и процедуры не атомарны. Поэтому в случае с
if (FCounter>=FMaxCount-1) then
SetEvent(FhEvent);
Условие может провериться и оно пускай выполняется, а в этот момент другой поток прервал и тоже проверил условие. оно естественно тоже выполнилось и установил событие. теперь выполнился третий поток и изменил FCounter так, что условие уже не выполняется. А теперь первый поток снова получает управление. и тут засада. Условие уже не выполняется, а событие устанавливается. Подумайте над этим
Галочка "подтверждения прочтения" - вселенское зло.
Иванов Сергей - 1109

Иванов Сергей - 1109 (статус: Посетитель), 12 июня 2007, 13:05 [#5]:

Вадим К, прошу извинить. Но тем не менее я просто рассчитывал именно на сценарий. Вроде как разобрался. Спасибо.
>>
if (FCounter>=FMaxCount-1) then
SetEvent(FhEvent);
стала атомарной в варианте № 2 (села под общий мютекс). А вот с ResetEvent я ещё долго возился.
Самое интересное, что , вроде, для подобной организации можно обойтись таки одним объектом синхронизации, но до этого я ещё не "дожил" :). Ещё раз спасибо.

Чтобы оставлять сообщения в мини-форумах, Вы должны авторизироваться на сайте.

Версия движка: 2.6+ (26.01.2011)
Текущее время: 22 февраля 2025, 11:29
Выполнено за 0.03 сек.