Inspirel banner

Programming Distributed Systems with YAMI4

5.1.2 C++

In C++ the outgoing message is created by the agent when the user code asks it for sending a new message:

auto_ptr<outgoing_message> message(
    my_agent.send("tcp://somewhere:12345",
        "my_object", "do_something", content));

Above, the message is sent to the given target, destination object name and with a given message name. The content object is supposed to be a parameters object; if it is omitted, the empty object is used by default. The outgoing message object is created by the agent on the free store and returned by pointer, so it is the user's responsibility to manage its further lifetime correctly.

The possible message states are defined in the message_state enumeration type. As described in the main part of this section, message states can be tracked and used for synchronization to enable the user code to align its processing with the message progress that is performed by background tasks. Thus, a complete idiom for message sending in C++ can involve the following structure:

try
{
    auto_ptr<outgoing_message> message(
        my_agent.send("tcp://somewhere:12345",
            "my_object", "do_something", content));

    // do something else while the message makes progress
    // or synchronize with its completion:
    message->wait_for_completion();

    message_state state = message->get_state();
    if (state == replied)
    {
        const parameters & reply_content = message->get_reply();

        // extract values from the reply content and use them
        // ...
    }
    else if (state == rejected)
    {
        const string & reason = message->get_exception_msg();

        // the reason is a description of the exception or error
        // that caused the message rejection
    }
    else
    {
        // the message has been abandoned,
        // because the given channel has been closed
    }
}
catch (const yami_runtime_error & ex)
{
    // message could not be sent because the connection
    // was not established with remote agent
}

In addition to the proper idiom for managing complete messaging cycle in the command-response interaction, the above example shows that in case of successful reception of message reply, the reply content can be extracted from the outgoing message object.

The wait_for_completion() and wait_for_transmission() functions support the optional relative timeout argument. If the timeout argument is omitted, the execution can potentially block forever. The absolute timeouts are supported by the _absolute() variant of these functions.

Asynchronous outgoing message notification, useful in those applications where the blocking nature of wait_... functions would not be convenient (this is particularly relevant in GUI programs), is based on callbacks into user-provided functor. The simplest callback functor is a non-member function that accepts outgoing message as its single parameter:

void my_message_callback(outgoing_message & msg)
{
    // ...
}

Such callback can be provided to appropriately overloaded send function when the message is sent:

agent::outgoing_message_id msg_id = my_agent.send(
    my_message_callback,
    "tcp://somewhere:12345",
    "my_object", "do_something", content);

The callback functor is automatically invoked each time there is a state change in the outgoing message object. The typical callback implementation will therefore be similar to the idiom presented above, where the message state is obtained with get_state() and then appropriate action is taken.

Note that the callback is executed in the context of the agent worker thread and should not block unnecessarily, as this would prevent the agent worker thread from making progress with other tasks. It is safe to call non-blocking functions on the same agent.

The outgoing message identifier can be later used to cancel the callback if the message is not yet completed - this is recommended to avoid memory leaks in case the message reply or rejection is known to never arrive:

my_agent.clean_outgoing_message_callback(msg_id);

The callback is automatically cancelled after reporting one of the completing message states.