Inspirel banner

Programming Distributed Systems with YAMI4

9.3.3 Java

The Java publisher and subscriber programs can be compiled by running the following command, assuming that the Java library was already compiled:

$ javac -cp ../../../../lib/yami4.jar *.java

Similar classpath argument will be needed to run the programs. For example, the following is necessary to start the publisher program with the shortest possible target:

$ java -cp ../../../../lib/yami4.jar:. Publisher 'tcp://*:*'

The publisher program is entirely implemented in the Publisher.java file, which is dissected in detail below.

First the relevant YAMI4 classes are imported:

import com.inspirel.yami.Agent;
import com.inspirel.yami.Parameters;
import com.inspirel.yami.ValuePublisher;

The ValuePublisher class provides the API for managing data-publication entities.

Some standard packages are also needed to handle random number generation:

import static java.lang.Math.abs;
import java.util.Random;

The publisher is very similar in its structure to a typical server, as already presented in previous examples, but here an important new entity is the ValuePublisher object that encapsulates the subscription management for a single data source:

public class Publisher {

    public static void main(String[] args) {
        if (args.length != 1) {
            System.out.println(
                "expecting one parameter: " +
                "publisher destination");
            return;
        }

        String publisherAddress = args[0];

        try {
            ValuePublisher randomValue =
                new ValuePublisher();

The randomValue object declared above will be later registered as a regular YAMI4 object that can be seen by remote clients.

            Agent publisherAgent = new Agent();

            String resolvedAddress =
                publisherAgent.addListener(publisherAddress);

            System.out.println(
                "The publisher is listening on " +
                resolvedAddress);

The value publisher object is registered as a regular object. In this example the "random_number" is the name that will be seen by remote clients.

            publisherAgent.registerValuePublisher(
                "random_number", randomValue);

The publisher performs a simple work of generating random numbers every second and publishing them to the currently subscribed clients.

All values that are published need to be expressed in terms of the parameters object, which is consistent with the general YAMI4 data model. In this example a single random value is published as an integer entry in the parameters object:

            // publish random values forever
            Parameters content = new Parameters();
            Random generator = new Random();
            while (true) {
                int random =
                    abs(generator.nextInt()) % 100;
                content.setInteger("value", random);

                System.out.println(
                    "publishing value " + random);

                randomValue.publish(content);

The above operation is asynchronous just like any other message-sending operation. Internally, it scans the list of currently subscribed clients and sends them an update message with the given parameters object. The actual messages are sent in background.

The publisher's main loop involves one-second delay, but in practical programs data publishing need not be periodic.

                Thread.sleep(1000);
            }

The publisher concludes with basic exception handling.

        } catch (Exception ex) {
            System.out.println(
                "error: " + ex.getMessage());
        }
    }
}

The subscriber program is implemented in a single Subscriber.java file.

Subscribers have both client- and server-side properties in the sense that they need to be able to both send a subscription message to the publisher - which is the client-side activity - and to receive the updates - which makes them act like servers.

The example subscriber code starts with relevant imports:

import com.inspirel.yami.Agent;
import com.inspirel.yami.IncomingMessage;
import com.inspirel.yami.IncomingMessageCallback;
import com.inspirel.yami.Parameters;

public class Subscriber {

To enable the subscriber to receive subscription updates, the appropriate message handler needs to be implemented. As with other servers, this is achieved with a nested class implementing the IncomingMessageCallback interface with a single call() function that accepts the incoming message object.

The subscriber simply prints the received value on the console:

    private static class UpdateHandler
        implements IncomingMessageCallback {

        @Override
        public void call(IncomingMessage im)
            throws Exception {

            Parameters content = im.getParameters();

            int value = content.getInteger("value");

            System.out.println("received update " + value);
        }
    }

The main part of the program obtains the publisher target from its command-line arguments and registers the message handler to enable proper routing of subscription updates.

An important property of this program is that even though it technically acts like a server (it receives updates via registered message handler), it does not have any listener. The listener is not needed here, because the subscription updates will be sent back using the same communication channel that is initially used for sending the subscription message - this is an example of reverting the channel's natural direction.

    public static void main(String[] args) {
        if (args.length != 1) {
            System.out.println(
                "expecting one parameter: " +
                "publisher destination");
            return;
        }

        String publisherAddress = args[0];

        try {
            Agent subscriberAgent = new Agent();

            // prepare subscription update callback

            final String updateObjectName =
                "update_handler";

            subscriberAgent.registerObject(
                updateObjectName, new UpdateHandler());

Once the message handler is registered as a regular object, the subscription message can be sent to the publisher. It is important to properly inform the publisher of the intended object name where the updates are to be delivered - here, the object is registered with the name "update_handler" and this name is sent to the publisher as the "destination_object" entry in the subscription message's payload:

            // subscribe to the producer

            Parameters params = new Parameters();
            params.setString(
                "destination_object", updateObjectName);

            subscriberAgent.sendOneWay(publisherAddress,
                "random_number", "subscribe", params);

            System.out.println(
                "subscribed, waiting for updates");

Above, the message that is sent to publisher is named ``subscribe'', which is automatically recognized by the value publisher object. Another recognized name is ``unsubscribe'', which causes the given subscriber to be removed from the list.

After all these preparations, the subscriber is ready to receive updates and falls into an infinite loop.

            // block forever
            // and receive updates in background
            while (true) {
                Thread.sleep(10000);
            }
        } catch (Exception ex) {
            System.out.println(
                "error: " + ex.getMessage());
        }
    }
}