Inspirel banner

Programming Distributed Systems with YAMI4

5.1.3 Java

In Java the outgoing message is created by the agent when the user code asks it for sending a new message:

OutgoingMessage message = myAgent.send("tcp://somewhere:12345",
    "my_object", "do_something", content);

The parameters object provided for the message is no longer referenced for this particular message and can be therefore modified (or destroyed) by the user code without affecting the message in any way.

Above, the message is sent to the given target, destination object name and with a given message name. The content object is supposed to be a parameters object; if it is null, the empty object is used by default.

The possible message states are defined in the MessageState enumeration type. As described in the main part of this section, message states can be tracked and used for synchronization to enable the user code to align its processing with the message progress that is performed by background tasks. Thus, a complete idiom for message sending in Java can involve the following structure:

try {
    OutgoingMessage message =
        myAgent.send("tcp://somewhere:12345",
            "my_object", "do_something", content);

    // do something else while the message makes progress
    // or synchronize with its completion:
    message.waitForCompletion();

    OutgoingMessage.MessageState state = message.getState();
    if (state == OutgoingMessage.MessageState.REPLIED) {
        Parameters replyContent = message.getReply();

        // extract values from the reply content and use them
        // ...

    } else if (state == OutgoingMessage.MessageState.REJECTED) {
        String reason = message.getExceptionMsg();

        // the reason is a description of the exception or error
        // that caused the message rejection

    } else {
        // the message has been abandoned,
        // because the given channel has been closed
    }

    message.close();
} catch (YAMIIOException ex) {
    // message could not be sent because the connection
    // was not established with remote agent
}

In addition to the proper idiom for managing complete messaging cycle in the command-response interaction, the above example shows that in case of successful reception of message reply, the reply content can be extracted from the outgoing message object.

The waitForCompletion() and waitForTransmission() functions support the optional relative timeout argument. If the timeout argument is omitted, the execution can potentially block forever. The absolute timeouts are supported by the Absolute() variant of these functions.

Outgoing messages are associated with some internal resources in the agent. These resources should be disposed when no longer needed, which is achieved with the close() method call.

Asynchronous outgoing message notification, useful in those applications where the blocking nature of wait... functions would not be convenient (this is particularly relevant in GUI programs), is based on user-provided callbacks that implement appropriately defined interface:

public interface OutgoingMessageCallback {
    void update(OutgoingMessage message) throws Exception;
}

The implementation of this callback interface can be provided to separately overloaded send function when the message is sent:

OutgoingMessage message = myAgent.send(
    myMessageCallback,
    "tcp://somewhere:12345",
    "my_object", "do_something", content);

The callback object is automatically invoked each time there is a state change in the outgoing message object. The typical callback implementation will therefore be similar to the idiom presented above, where the message state is obtained with getState() and then appropriate action is taken.

Note that the callback is executed in the context of the agent worker thread and should not block unnecessarily, as this would prevent the agent worker thread from making progress with other tasks. It is safe to call non-blocking functions on the same agent.

The outgoing message object can still be explicitly close()d if the message is not yet completed - this is recommended to avoid memory leaks in case the message reply or rejection is known to never arrive. The outgoing message object and its callback are automatically cleaned up after reporting one of the completing message states.