Monday, 28 November 2016

Message Queue with a TThread Worker Using OmniThreadLibrary’s

A simple example, now part of the OTL repository (stored in the folder examples\TThread communication).Two separate topics are covered in this example:
  • Sending data from any thread (main or background) to a TThread-based worker.
  • Sending data from a TThread-based worker to a form.
Let’s deal with them one by one.
1. Sending data from multiple producers to a single worker
To send data form a form to a thread, we need a message queue. This example uses a TOmniMessageQueue object for that purpose. An instance of this object is created in the main thread. All threads – the main thread, the worker threads, and possible other data-producing threads – use the same shared object which is written with thread-safety in mind.

1.a Initialization and cleanup
The TOmniMessageQueue constructor takes a maximum queue size for a parameter. TWorker is just a simple TThread descendant which accepts the instance of the message queue as a parameter so it can read from the queue.
FCommandQueue := TOmniMessageQueue.Create(1000);
FWorker       := TWorker.Create(FCommandQueue);  
The shutdown sequence is fairly standard. Stop is used instead of Terminate so it can set internal event which is used to signal the thread to stop.
if assigned(FWorker) then   
 begin   
  FWorker.Stop;  
  FWorker.WaitFor;  
  FreeAndNil(FWorker);  
 end;  
  FreeAndNil(FCommandQueue);  

1.b Sending data to the worker
To put some data into a queue, use its Enqueue method. It accepts a TOmniMessage record. Each TOmniMessage contains an integer message ID (not used in this example) and a TOmniValue data which, in turn, can hold any data type.
procedure TfrmTThreadComm.Query(value: integer);  
 begin   
  if not FCommandQueue.Enqueue(TOmniMessage.Create(0 {ignored}, value)) then    
    raise Exception.Create('Command queue is full!');  
 end;  
Enqueue returns False if the queue is full. (A TOmniMessageQueue can only hold as much elements as specified in the constructor call.)

The example also shows how everything works correctly if two threads are started at the same time and both write to the message queue.

var  
  th1: TThread;  
  th2: TThread;  
 begin  
  th1 := TThread.CreateAnonymousThread(  
   procedure  
   begin  
    Query(Random(1000));  
   end);  
  th2 := TThread.CreateAnonymousThread(  
   procedure  
   begin  
    Query(Random(1000));  
   end);  
  th1.Start;  
  th2.Start;  
 end;  
1.c Receiving the data
The worker’s Execute method waits on two handles in a loop. If a FStopEvent (an internal event) is signalled, the loop will exit. If the message queue’s GetNewMessageEvent (a THandle-returning method) gets signalled, a new data has arrived to the queue. In that case, the code loops to empty the message queue and then waits again for something to happen.
procedure TWorker.Execute;  
 var 
   handles: array [0..1] of THandle;   
   msg  : TOmniMessage;  
 begin  
  handles[0] := FStopEvent.Handle;  
  handles[1] := FCommandQueue.GetNewMessageEvent;  
  while WaitForMultipleObjects(2, @handles, false, INFINITE)  
   = (WAIT_OBJECT_0 + 1) do  
  begin  
   while FCommandQueue.TryDequeue(msg) do  
   begin  
   //process the message …    
   end;   
  end;  
 end;
  
2. Sending data from a worker to the form
To send messages from a worker thread to a form we need another instance of TOmniMessageQueue. As we can’t wait on a handle in the main thread (that would block the user interface), we’ll use a different notification mechanism – a window message observer.

2.a Initialization and cleanup
We create the queue just as in the first part. Then we create a window message observer and at the end we Attach it to the message queue. A window message observer sends a window message to some window each time a message queue changes. The four parameters passed to CreateContainerWindowsMessageObserver are the handle of the window that will receive those messages, a message ID, WPARAM, and LPARAM.
FResponseQueue.ContainerSubject.Detach(FResponseObserver,coiNotifyOnAllInserts);  
 FreeAndNil(FResponseObserver);  
 ProcessResults;  
 FreeAndNil(FResponseQueue);  

While shutting down, we first have to Detach the observer from the queue. Then we destroy the observer and empty the response queue (ProcessResults) to process any results that may still be waiting inside.
FResponseQueue.ContainerSubject.Detach(FResponseObserver,coiNotifyOnAllInserts);  
FreeAndNil(FResponseObserver);  
ProcessResults;  
FreeAndNil(FResponseQueue);  
2.b Sending data to the form
To send a data, we use exactly the same approach as in 1.b.
if not FResponseQueue.Enqueue(TOmniMessage.Create(0 {ignored},  
      Format('= %d', [msg.MsgData.AsInteger * 2]))) then raise Exception.Create('Response queue is full!');  
2.c Receiving the data
On the receiving side (the form) we have to set up a message function associated with the message that is sent from the window message observer. In this method we’ll call another method ProcessResults.
const  MSG_WORKER_RESULT = WM_USER;
procedure WorkerResult(var msg: TMessage); message MSG_WORKER_RESULT;
procedure TfrmTThreadComm.WorkerResult(var msg: TMessage);
begin  
  ProcessResults;
end;
As a final step, ProcessResults reads data from the message queue and displays each element in a listbox.
procedure TfrmTThreadComm.ProcessResults;  
 var   
  msg: TOmniMessage;  
 begin   
  while FResponseQueue.TryDequeue(msg) do   
   begin    
    //msg.MsgID is ignored in this demo    
    //msg.MsgData contains a string, generated by the worker    
    lbLog.ItemIndex := lbLog.Items.Add(msg.MsgData);  
   end;  
 end;
  
3. Using a TOmniBlockingCollection instead of TOmniMessageQueue
Alternatively, you can use the blocking collection implementation from OtlCollections instead. A blocking collection would be appropriate in case you have to handle large number of work requests or responses stored in a queue as a blocking collection grows and shrinks dynamically.

The only important change to the code would be in part 1 as you’d have to create an event observer manually while TOmniMessageQueue does it automatically. For details you can check TOmniMessageQueue.AttachWinEventObserver and TOmniMessageQueue.Destroy.

Source : This post was written originally by : Primož Gabrijelčič Here

0 comments:

Post a Comment