Inspirel banner

Programming Distributed Systems with YAMI4

9.3.1 Ada

The Ada publisher and subscriber programs can be compiled by running make, assuming that the core library was already compiled.

The publisher program is entirely implemented in the publisher.adb file, which is dissected in detail below.

First the relevant YAMI4 packages are withed:

with YAMI.Agents;
with YAMI.Parameters;
with YAMI.Value_Publishers;

The Value_Publishers package provides the API for managing data-publication entities.

Some standard packages are also needed to handle command-line arguments and random number generation:

with Ada.Command_Line;
with Ada.Exceptions;
with Ada.Numerics.Discrete_Random;
with Ada.Text_IO;

The publisher is very similar in its structure to a typical server, as already presented in previous examples, but here an important new entity is the Value_Publisher object that encapsulates the subscription management for a single data source:

procedure Publisher is
begin
   if Ada.Command_Line.Argument_Count /= 1 then
      Ada.Text_IO.Put_Line
        ("expecting one parameter: publisher destination");
      Ada.Command_Line.Set_Exit_Status
        (Ada.Command_Line.Failure);
      return;
   end if;

   declare
      Publisher_Address : constant String :=
        Ada.Command_Line.Argument (1);

      Publisher_Agent : aliased YAMI.Agents.Agent :=
        YAMI.Agents.Make_Agent;
      Resolved_Publisher_Address :
        String (1 .. YAMI.Agents.Max_Target_Length);
      Resolved_Publisher_Address_Last : Natural;

      Random_Value : YAMI.Value_Publishers.Value_Publisher;

The Random_Value object declared above will be later registered as a regular YAMI4 object that can be seen by remote clients.

      Content : YAMI.Parameters.Parameters_Collection :=
        YAMI.Parameters.Make_Parameters;

      package My_Random_Numbers is
         new Ada.Numerics.Discrete_Random
        (YAMI.Parameters.YAMI_Integer);

      My_Random_Generator : My_Random_Numbers.Generator;

      Random : YAMI.Parameters.YAMI_Integer;

      use type YAMI.Parameters.YAMI_Integer;
   begin
      Publisher_Agent.Add_Listener
        (Publisher_Address,
         Resolved_Publisher_Address,
         Resolved_Publisher_Address_Last);

      Ada.Text_IO.Put_Line
        ("The publisher is listening on " &
           Resolved_Publisher_Address
           (1 .. Resolved_Publisher_Address_Last));

The value publisher object is registered as a regular object, but to facilitate internal object structure the registration is initiated by the value publisher instead of the agent. In any case, in this example the "random_number" is the name that will be seen by remote clients.

      Random_Value.Register_At
        (Publisher_Agent'Unchecked_Access, "random_number");

The publisher performs a simple work of generating random numbers every second and publishing them to the currently subscribed clients.

All values that are published need to be expressed in terms of the parameters object, which is consistent with the general YAMI4 data model. In this example a single random value is published as an integer entry in the parameters object:

      --  publish random numbers forever
      loop
         Random :=
           My_Random_Numbers.Random (My_Random_Generator)
           mod 100;
         Content.Set_Integer ("value", Random);

         Ada.Text_IO.Put_Line
           ("publishing value " &
              YAMI.Parameters.YAMI_Integer'Image (Random));

         Random_Value.Publish (Content);

The above operation is asynchronous just like any other message-sending operation. Internally, it scans the list of currently subscribed clients and sends them an update message with the given parameters object. The actual messages are sent in background.

The publisher's main loop involves one-second delay, but in practical programs data publishing need not be periodic.

         delay 1.0;
      end loop;
   end;

The publisher concludes with basic exception handling.

exception
   when E : others =>
      Ada.Text_IO.Put_Line
        (Ada.Exceptions.Exception_Message (E));
end Publisher;

The subscriber program is implemented in a single subscriber.adb file.

Subscribers have both client- and server-side properties in the sense that they need to be able to both send a subscription message to the publisher - which is the client-side activity - and to receive the updates - which makes them act like servers.

The example subscriber code starts with relevant with clauses:

with YAMI.Agents;
with YAMI.Incoming_Messages;
with YAMI.Parameters;

with Ada.Command_Line;
with Ada.Exceptions;
with Ada.Text_IO;

procedure Subscriber is

To enable the subscriber to receive subscription updates, the appropriate message handler needs to be implemented. As with other servers, this is done by overriding the Call operation of the Message_Handler interface:

   type Update_Handler is
     new YAMI.Incoming_Messages.Message_Handler
     with null record;

   overriding
   procedure Call
     (H : in out Update_Handler;
      Message : in out
      YAMI.Incoming_Messages.Incoming_Message'Class) is

In this simple example the subscriber simply prints the received value on the console and similarly to other examples uses a local procedure to inspect the message content:

     procedure Process
       (Content : in out YAMI.Parameters.Parameters_Collection)
     is
        Value : constant YAMI.Parameters.YAMI_Integer :=
          Content.Get_Integer ("value");
     begin
        Ada.Text_IO.Put_Line
          ("received update: " &
             YAMI.Parameters.YAMI_Integer'Image (Value));
     end Process;

   begin
      Message.Process_Content (Process'Access);
   end Call;

   My_Handler : aliased Update_Handler;

The main part of the program obtains the publisher target from its command-line arguments and registers the message handler to enable proper routing of subscription updates.

An important property of this program is that even though it technically acts like a server (it receives updates via registered message handler), it does not have any listener. The listener is not needed here, because the subscription updates will be sent back using the same communication channel that is initially used for sending the subscription message - this is an example of reverting the channel's natural direction.

begin
   if Ada.Command_Line.Argument_Count /= 1 then
      Ada.Text_IO.Put_Line
        ("expecting one parameter: publisher destination");
      Ada.Command_Line.Set_Exit_Status
        (Ada.Command_Line.Failure);
      return;
   end if;

   declare
      Publisher_Address : constant String :=
        Ada.Command_Line.Argument (1);

      Subscriber_Agent : YAMI.Agents.Agent :=
        YAMI.Agents.Make_Agent;

      Update_Object_Name : constant String :=
        "update_handler";

      Params : YAMI.Parameters.Parameters_Collection :=
        YAMI.Parameters.Make_Parameters;
   begin
      --  prepare subscription update callback

      Subscriber_Agent.Register_Object
        (Update_Object_Name, My_Handler'Unchecked_Access);

Once the message handler is registered as a regular object, the subscription message can be sent to the publisher. It is important to properly inform the publisher of the intended object name where the updates are to be delivered - here, the object is registered with the name "update_handler" and this name is sent to the publisher as the "destination_object" entry in the subscription message's payload:

      --  subscribe to the producer

      Params.Set_String
        ("destination_object", Update_Object_Name);

      Subscriber_Agent.Send_One_Way
        (Publisher_Address,
         "random_number", "subscribe", Params);

      Ada.Text_IO.Put_Line
        ("subscribed, waiting for updates");

Above, the message that is sent to publisher is named ``subscribe'', which is automatically recognized by the value publisher object. Another recognized name is ``unsubscribe'', which causes the given subscriber to be removed from the list.

After all these preparations, the subscriber is ready to receive updates and falls in the infinite loop.

      loop
         delay 10.0;
      end loop;
   end;
exception
   when E : others =>
      Ada.Text_IO.Put_Line
        (Ada.Exceptions.Exception_Message (E));
end Subscriber;