Inspirel banner

Programming Distributed Systems with YAMI4

9.3.2 C++

The C++ publisher and subscriber programs can be compiled by running make, assuming that the core and C++ libraries were already compiled.

The publisher program is entirely implemented in the publisher.cpp file, which is dissected in detail below.

First relevant headers file is included:

#include <yami4-cpp/yami.h>

#include <cstdlib>
#include <iostream>

#include <pause.h>

The publisher is very similar in its structure to a typical server, as already presented in previous examples, but here an important new entity is the value_publisher object that encapsulates the subscription management for a single data source:

int main(int argc, char * argv[])
{
    if (argc != 2)
    {
        std::cout
            << "expecting one parameter: "
            << "publisher destination\n";
        return EXIT_FAILURE;
    }

    const std::string publisher_address = argv[1];

    try
    {
        yami::value_publisher random_value;

The random_value object declared above will be later registered as a regular YAMI4 object that can be seen by remote clients.

        yami::agent publisher_agent;

        const std::string resolved_address =
            publisher_agent.add_listener(publisher_address);

        std::cout << "The publisher is listening on "
            << resolved_address << std::endl;

The value publisher object is registered as a regular object. In this example the "random_number" is the name that will be seen by remote clients.

        publisher_agent.register_value_publisher(
            "random_number", random_value);

The publisher performs a simple work of generating random numbers every second and publishing them to the currently subscribed clients.

All values that are published need to be expressed in terms of the parameters object, which is consistent with the general YAMI4 data model. In this example a single random value is published as an integer entry in the parameters object:

        // publish random numbers forever
        yami::parameters content;
        while (true)
        {
            const int random = std::rand() % 100;
            content.set_integer("value", random);

            std::cout
                << "publishing value " << random << std::endl;

            random_value.publish(content);

The above operation is asynchronous just like any other message-sending operation. Internally, it scans the list of currently subscribed clients and sends them an update message with the given parameters object. The actual messages are sent in background.

The publisher's main loop involves one-second delay, but in practical programs data publishing need not be periodic. In this example a platform-independent pause() function is used, declared in the pause.h header that was included at the beginning of the program.

            // pause for 1s
            examples::pause();
        }
    }

The publisher concludes with basic exception handling.

    catch (const std::exception & e)
    {
        std::cout << "error: " << e.what() << std::endl;
    }
}

The subscriber program is implemented in a single subscriber.cpp file.

Subscribers have both client- and server-side properties in the sense that they need to be able to both send a subscription message to the publisher - which is the client-side activity - and to receive the updates - which makes them act like servers.

The example subscriber code starts with relevant include directives:

#include <yami4-cpp/yami.h>

#include <cstdlib>
#include <iostream>

To enable the subscriber to receive subscription updates, the appropriate message handler needs to be implemented. As with other servers, this is achieved with a global function that accepts the incoming message object as its single parameter.

The subscriber simply prints the received value on the console:

void update(yami::incoming_message & message)
{
    const yami::parameters & content =
        message.get_parameters();

    const int value = content.get_integer("value");

    std::cout << "received update: " << value << std::endl;
}

The main part of the program obtains the publisher target from its command-line arguments and registers the message handler to enable proper routing of subscription updates.

An important property of this program is that even though it technically acts like a server (it receives updates via registered message handler), it does not have any listener. The listener is not needed here, because the subscription updates will be sent back using the same communication channel that is initially used for sending the subscription message - this is an example of reverting the channel's natural direction.

int main(int argc, char * argv[])
{
    if (argc != 2)
    {
        std::cout
            << "expecting parameter: publisher destination\n";
        return EXIT_FAILURE;
    }

    const std::string publisher_address = argv[1];

    try
    {
        yami::agent subscriber_agent;

        // prepare subscription update callback

        const std::string update_object_name =
            "update_handler";

        subscriber_agent.register_object(
            update_object_name, update);

Once the message handler is registered as a regular object, the subscription message can be sent to the publisher. It is important to properly inform the publisher of the intended object name where the updates are to be delivered - here, the object is registered with the name "update_handler" and this name is sent to the publisher as the "destination_object" entry in the subscription message's payload:

        // subscribe to the producer

        yami::parameters params;
        params.set_string(
            "destination_object", update_object_name);

        subscriber_agent.send_one_way(server_address,
            "random_number", "subscribe", params);

        std::cout
            << "subscribed, waiting for updates" << std::endl;

Above, the message that is sent to publisher is named ``subscribe'', which is automatically recognized by the value publisher object. Another recognized name is ``unsubscribe'', which causes the given subscriber to be removed from the list.

After all these preparations, the subscriber is ready to receive updates and blocks to enable the background threads to continue their work.

        // block forever and receive updates in background
        int dummy;
        std::cin >> dummy;
    }
    catch (const std::exception & e)
    {
        std::cout << "error: " << e.what() << std::endl;
    }
}