Wednesday, 30 November 2016

Incrementing Progress Bar From a ForEach Loop

A deceptively simple question – how do you update a progress bar from a ForEach loop – popped up on the Google+ OmniThreadLibrary community. The implementation turned out to be quite tricky so this is an explaining example created by Primož Gabrijelčič (55_ForEachProgress) which is now part of the OmniThreadLibrary SVN repository.

The starting point was a simple Parallel.ForEach loop which he further simplified in the demo.
Parallel
    .ForEach(1, CNumLoop)
    .Execute(
    procedure (const task: IOmniTask; const i: integer)
    begin
      // do some work
      Sleep(1);

      // update the progress bar - how?
    end
  );

We cannot simply update the progress bar from the ForEach executor as that code executes in a background thread and one must never ever access VCL GUI from a background thread! It is also no good to send “please update” Windows messages to main thread as Parallel.ForEach is by default blocking – it waits for all workers to stop working – and messages won’t be processed during ForEach execution.

First part of solution is to make ForEach non-blocking. To do that, we just add a .NoWait modifier. We also have to store the interface returned from Parallel.ForEach call into some global field or ForEach object will be destroyed on the exit from the current method (i.e. the method in which Parallel.ForEach is called).

type
  TfrmForEachWithProgressBar = class(TForm)
    …
  private
    FWorker: IOmniParallelLoop< integer>;
  end;
  FWorker := Parallel
    .ForEach(1, CNumLoop)
    .NoWait;

The problem now is how to destroy the FWorker interface. Parallel.ForEach provides an OnStop delegate which is called when the last worker thread finishes its job. The delegate is, however, called from the worker thread so we must not destroy FWorker in there. That would cause the ForEach object to be destroyed while the last worker is still running and would lead to a crash or a hanged program. A correct way is to schedule the cleanup to the main thread by using the Invoke method.
// reference must be kept in a global field so that the task controller 
  // is not destroyed before the processing ends
  FWorker := Parallel
    .ForEach(1, CNumLoop)
    .NoWait // important, otherwise message loop will be blocked while 
            // ForEach waits for all tasks to terminate
    .OnStop(
      procedure (const task: IOmniTask)
      begin
        // because of NoWait, OnStop delegate is invoked from the worker code;
        // we must not destroy the worker at that point or the program will
        // block or crash
        task.Invoke(
          procedure begin
            FWorker := nil;
          end
        );
      end
    );

Just a side note – I oh so miss type inference and better anonymous method syntax in Delphi! In Smart, OnStop handler would be written as
.OnStop(
  lambda(task)
    task.Invoke(lambda FWorker := nil; end); 
  end);
Destruction being taken care of, we still have to update the progress bar. To do that, worker calls IncrementProgressBar method via the Invoke mechanism (so that it is executed in the main thread and can update the VCL).
FWorker.Execute(
    procedure (const task: IOmniTask; const i: integer)
    begin
      // do some work
      Sleep(1);

      // update the progress bar
      // we cannot use 'i' for progress as it does not increase sequentially
      // IncrementProgressBar uses internal counter to follow the progress
      task.Invoke(IncrementProgressBar);
    end
  );

Because the values of i are not passed in order to the worker method, we cannot use them to determine the progress. Instead, the main form keeps its own count of work to be done. It is initialized before the Parallel.ForEach is created.
pbForEach.Max := 100;
  pbForEach.Position := 0;
  pbForEach.Update;
  FProgress := 0;
  FPosition := 0;

In the end, IncrementProgressBar, well, increments the progress bar. It also makes sure that we don’t overflow the Windows control with messages.
procedure TfrmForEachWithProgressBar.IncrementProgressBar;
var
  newPosition: integer;
begin
  Inc(FProgress);
  newPosition := Trunc((FProgress / CNumLoop)*pbForEach.Max);

  // make sure we don't overflow TProgressBar with messages
  if newPosition <> FPosition then begin
    pbForEach.Position := newPosition;
    FPosition := newPosition;
  end;
end;

If you are enumerating over a very large range, you’ll also want to reduce number of Invoke(IncrementProgressBar) calls. Each Invoke causes a Windows message to be sent and sending millions of messages will negatively affect the program performance. The simplest way to do that is to only call IncrementProgressBar if the loop counter is a nice rounded value, for example:
if (i mod 1000) = 0 then
        task.Invoke(IncrementProgressBar);

This is just for self learning and self archives
All credits go to the original creator.
Source: originally written by Primož Gabrijelčič here




Monday, 28 November 2016

Message Queue with a TThread Worker Using OmniThreadLibrary’s

A simple example, now part of the OTL repository (stored in the folder examples\TThread communication).Two separate topics are covered in this example:
  • Sending data from any thread (main or background) to a TThread-based worker.
  • Sending data from a TThread-based worker to a form.
Let’s deal with them one by one.
1. Sending data from multiple producers to a single worker
To send data form a form to a thread, we need a message queue. This example uses a TOmniMessageQueue object for that purpose. An instance of this object is created in the main thread. All threads – the main thread, the worker threads, and possible other data-producing threads – use the same shared object which is written with thread-safety in mind.

1.a Initialization and cleanup
The TOmniMessageQueue constructor takes a maximum queue size for a parameter. TWorker is just a simple TThread descendant which accepts the instance of the message queue as a parameter so it can read from the queue.
FCommandQueue := TOmniMessageQueue.Create(1000);
FWorker       := TWorker.Create(FCommandQueue);  
The shutdown sequence is fairly standard. Stop is used instead of Terminate so it can set internal event which is used to signal the thread to stop.
if assigned(FWorker) then   
 begin   
  FWorker.Stop;  
  FWorker.WaitFor;  
  FreeAndNil(FWorker);  
 end;  
  FreeAndNil(FCommandQueue);  

1.b Sending data to the worker
To put some data into a queue, use its Enqueue method. It accepts a TOmniMessage record. Each TOmniMessage contains an integer message ID (not used in this example) and a TOmniValue data which, in turn, can hold any data type.
procedure TfrmTThreadComm.Query(value: integer);  
 begin   
  if not FCommandQueue.Enqueue(TOmniMessage.Create(0 {ignored}, value)) then    
    raise Exception.Create('Command queue is full!');  
 end;  
Enqueue returns False if the queue is full. (A TOmniMessageQueue can only hold as much elements as specified in the constructor call.)

The example also shows how everything works correctly if two threads are started at the same time and both write to the message queue.

var  
  th1: TThread;  
  th2: TThread;  
 begin  
  th1 := TThread.CreateAnonymousThread(  
   procedure  
   begin  
    Query(Random(1000));  
   end);  
  th2 := TThread.CreateAnonymousThread(  
   procedure  
   begin  
    Query(Random(1000));  
   end);  
  th1.Start;  
  th2.Start;  
 end;  
1.c Receiving the data
The worker’s Execute method waits on two handles in a loop. If a FStopEvent (an internal event) is signalled, the loop will exit. If the message queue’s GetNewMessageEvent (a THandle-returning method) gets signalled, a new data has arrived to the queue. In that case, the code loops to empty the message queue and then waits again for something to happen.
procedure TWorker.Execute;  
 var 
   handles: array [0..1] of THandle;   
   msg  : TOmniMessage;  
 begin  
  handles[0] := FStopEvent.Handle;  
  handles[1] := FCommandQueue.GetNewMessageEvent;  
  while WaitForMultipleObjects(2, @handles, false, INFINITE)  
   = (WAIT_OBJECT_0 + 1) do  
  begin  
   while FCommandQueue.TryDequeue(msg) do  
   begin  
   //process the message …    
   end;   
  end;  
 end;
  
2. Sending data from a worker to the form
To send messages from a worker thread to a form we need another instance of TOmniMessageQueue. As we can’t wait on a handle in the main thread (that would block the user interface), we’ll use a different notification mechanism – a window message observer.

2.a Initialization and cleanup
We create the queue just as in the first part. Then we create a window message observer and at the end we Attach it to the message queue. A window message observer sends a window message to some window each time a message queue changes. The four parameters passed to CreateContainerWindowsMessageObserver are the handle of the window that will receive those messages, a message ID, WPARAM, and LPARAM.
FResponseQueue.ContainerSubject.Detach(FResponseObserver,coiNotifyOnAllInserts);  
 FreeAndNil(FResponseObserver);  
 ProcessResults;  
 FreeAndNil(FResponseQueue);  

While shutting down, we first have to Detach the observer from the queue. Then we destroy the observer and empty the response queue (ProcessResults) to process any results that may still be waiting inside.
FResponseQueue.ContainerSubject.Detach(FResponseObserver,coiNotifyOnAllInserts);  
FreeAndNil(FResponseObserver);  
ProcessResults;  
FreeAndNil(FResponseQueue);  
2.b Sending data to the form
To send a data, we use exactly the same approach as in 1.b.
if not FResponseQueue.Enqueue(TOmniMessage.Create(0 {ignored},  
      Format('= %d', [msg.MsgData.AsInteger * 2]))) then raise Exception.Create('Response queue is full!');  
2.c Receiving the data
On the receiving side (the form) we have to set up a message function associated with the message that is sent from the window message observer. In this method we’ll call another method ProcessResults.
const  MSG_WORKER_RESULT = WM_USER;
procedure WorkerResult(var msg: TMessage); message MSG_WORKER_RESULT;
procedure TfrmTThreadComm.WorkerResult(var msg: TMessage);
begin  
  ProcessResults;
end;
As a final step, ProcessResults reads data from the message queue and displays each element in a listbox.
procedure TfrmTThreadComm.ProcessResults;  
 var   
  msg: TOmniMessage;  
 begin   
  while FResponseQueue.TryDequeue(msg) do   
   begin    
    //msg.MsgID is ignored in this demo    
    //msg.MsgData contains a string, generated by the worker    
    lbLog.ItemIndex := lbLog.Items.Add(msg.MsgData);  
   end;  
 end;
  
3. Using a TOmniBlockingCollection instead of TOmniMessageQueue
Alternatively, you can use the blocking collection implementation from OtlCollections instead. A blocking collection would be appropriate in case you have to handle large number of work requests or responses stored in a queue as a blocking collection grows and shrinks dynamically.

The only important change to the code would be in part 1 as you’d have to create an event observer manually while TOmniMessageQueue does it automatically. For details you can check TOmniMessageQueue.AttachWinEventObserver and TOmniMessageQueue.Destroy.

Source : This post was written originally by : Primož Gabrijelčič Here