Documentation

Documentation versions (currently viewingVaadin 23)

Implementing Support for Clustered Environments

Important
Experimental Feature

This is currently considered an experimental feature, which means that its behavior, API, and look and feel might still change. In order to use the Backend API, it must be explicitly enabled with a feature flag. See the Feature Flag section for how to do this.

Using Collaboration Engine out of the box in an application running in a clustered environment would result in users being able to collaborate only with others connected to the same app instance (i.e. on the same node). To properly run clustered application deployments, Collaboration Engine provides the Backend superclass that can be extended to support such multi-instance environments.

In this article we will show how to implement a custom backend to support clustering based on the Hazelcast platform.

Understanding the concept of Event Log

A custom Backend implementation is the gateway Collaboration Engine uses to obtain access to Event Logs. An Event Log is a strictly ordered log of submitted events involving Topic data, such as newly added items or value changes. The EventLog API provides methods to submit new events to the log and to add a subscriber to receive all past and future events. All events are marked by a unique identifier and the API provides a method to remove all events in the log before a given identifier.

Implementing an Event Log for Hazelcast

Let’s start implementing the EvengLog interface for our reference Hazelcast platform. Hazelcast provides a very straightforward streaming API based on shared maps and lists. The Event Log can be easily implemented making use of a Hazelcast IList, but first we need a class to store both the event identifier and payload.

private static final class IdAndPayload implements Serializable {

    private final UUID id;

    private final String payload;

    private IdAndPayload(UUID id, String payload) {
        this.id = id;
        this.payload = payload;
    }
}

Once we have that, we can start implementing the interface methods. The submitEvent takes the event identifier and payload, so we can simply store them in a new IdAndPayload object and add it to the Hazelcast list for this Event Log.

public static class HazelcastEventLog implements EventLog {

    private final IList<IdAndPayload> list;
    public HazelcastEventLog(IList<IdAndPayload> list) {
        this.list = list;
    }
    @Override
    public void submitEvent(UUID id, String payload) {
        list.add(new IdAndPayload(id, payload));
    }
}

To implement subscriptions to events, we’re going to add an item listener to the Hazelcast list. The subscriber receives all past and future events for this Event Log. A newly added subscriber should initially receive all previous events in the log based on their original order, so that it can catch up with the latest state. New events should be delivered (in order) only after all previous events have been delivered. It is not allowed to invoke the subscribe method again until the previous subscriber has been removed.

The subscribe method optionally takes the identifier of the last known event, so that only newer events will be notified to the subscriber. If an identifier is provided and not found in the Event Log, a EventIdNotFoundException should be thrown.

Tip
Exception handling
When the exception is caught by the code calling this method, it may want to re-attempt the subscription with another identifier.
@Override
public synchronized Registration subscribe(UUID newerThan,
        BiConsumer<UUID, String> eventSubscriber)
        throws EventIdNotFoundException {
    if (this.eventSubscriber != null) {
        throw new IllegalStateException(); 1
    }

    if (newerThan != null) {
        Optional<IdAndPayload> newerThanIdAndEvent = list.stream()
                .filter(item -> newerThan.equals(item.id)).findFirst();
        if (newerThanIdAndEvent.isEmpty()) {
            throw new EventIdNotFoundException(
                    "newerThan doesn't " + "exist in the log."); 2
        }
    }
    this.newerThan = newerThan;
    this.eventSubscriber = eventSubscriber;
    nextEventIndex = 0;

    UUID registrationId = list
            .addItemListener(new ItemListener<IdAndPayload>() {
                @Override
                public void itemAdded(ItemEvent<IdAndPayload> item) {
                    deliverEvents();
                }

                @Override
                public void itemRemoved(ItemEvent<IdAndPayload> item) {
                    handleRemoveItem();
                }
            }, false); 3

    // Deliver initial events
    deliverEvents(); 4

    return () -> {
        synchronized (this) {
            list.removeItemListener(registrationId);
            this.eventSubscriber = null;
        }
    }; 5
}
  1. Only a single subscriber is allowed, so we throw an exception if one is already set.

  2. If an event identifier is provided, we check if it exists in the list. If it doesn’t, we throw a EventIdNotFoundException.

  3. We add an item listener to the Hazelcast list to handle new items and removed ones.

  4. Then all past events are initially delivered.

  5. Finally we return a Registration that can be used to remove the subscriber.

Let’s now dive in into the custom method that effectively delivers the events, the deliverEvents method. This is a synchronized method to prevent it being invoked by multiple threads simultaneously and to avoid duplicate events being notified to the subscriber. The method keeps track of the Hazelcast list index to identify the next event and increments this index until all the events are delivered. If an event identifier has been set as the starting point, no events are delivered until that identifier is reached.

private synchronized void deliverEvents() {
    while (nextEventIndex < list.size()) {
        IdAndPayload event = list.get(nextEventIndex++);
        if (this.newerThan == null) {
            eventSubscriber.accept(event.id, event.payload);
        } else {
            if (event.id.equals(newerThan)) {
                this.newerThan = null;
            }
        }
    }
}

Finally, the last method we need to implement for the EventLog interface is the truncate method. This method serves the purpose to limit the number of events contained in the log, avoiding it growing infinitely. It takes the identifier of the oldest known event that should be preserved, or — if a null identifier is provided — it empties the whole log.

To implement this behavior for Hazelcast, we create a simple Predicate and pass it to the list’s removeIf method.

@Override
public synchronized void truncate(UUID olderThan) {
    Predicate<IdAndPayload> filter = e -> true;
    if (olderThan != null) {
        Optional<IdAndPayload> olderThanEvent = list.stream()
                .filter(item -> olderThan.equals(item.id)).findFirst();
        if (olderThanEvent.isEmpty()) {
            // NOOP
            return;
        }
        filter = new Predicate<>() {
            boolean found;

            @Override
            public boolean test(IdAndPayload event) {
                found = found || olderThan.equals(event.id);
                return !found;
            }
        };
    }
    list.removeIf(filter);
}

Opening an Event Log

Now that we have a Hazelcast implementation of the EventLog interface, to be able to create and get instances of it we need to extend the Backend class. Since our implementation only depends on a single Hazelcast IList, it is very easy to implement the openEventLog method and we do so by returning a new instance of the HazelcastEventLog with the list named after the logId parameter.

@Override
public EventLog openEventLog(String logId) {
    return new HazelcastEventLog(hz.getList(logId));
}

Managing Event Log Snapshots

A snapshot is an opaque representation of data at a certain moment in time. It can be used, for example, by nodes joining a cluster to quickly catch-up with a recent state of data without the need to replay all the events from the beginning of time. Snapshots are identified by a name, and each version of a named snapshot is assigned with an additional unique identifier.

Loading a Snapshot

To load the latest version of a snapshot, the Backend class provides the loadLatestSnapshot method. This method can be simply implemented for Hazelcast using a map to store the latest available snapshot.

@Override
public CompletableFuture<Snapshot> loadLatestSnapshot(String name) {
    return CompletableFuture.completedFuture(snapshots.get(name));
}

Submitting a new Snapshot

To submit a new snapshot of data the replaceSnapshot method should be used. It takes the name of the snapshot, the expected unique identifier of the latest snapshot, the unique identifier of the new snapshot and finally the payload of the snapshot itself. To implement this method for Hazelcast, we need some logic to verify that the latest snapshot is the expected one.

@Override
public CompletableFuture<Void> replaceSnapshot(String name, UUID expectedId,
        UUID newId, String payload) {
    Snapshot currentSnapshot = snapshots.computeIfAbsent(name,
            k -> new Snapshot(null, null));

    if (Objects.equals(expectedId, currentSnapshot.getId())) {
        Snapshot idAndPayload = new Snapshot(newId, payload);
        snapshots.put(name, idAndPayload);
    }

    return CompletableFuture.completedFuture(null);
}

Node identity and Membership events

The primary purpose of the Backend API is to support collaboration in applications deployed in clustered environments. Every Backend instance represents a member of the cluster and is uniquely identified by a UUID which should be returned by the getNodeId method. For our Hazelcast implementation, we simply return the local member identifier.

@Override
public UUID getNodeId() {
    return hz.getCluster().getLocalMember().getUuid();
}

When multiple Backend instances are involved, it is necessary to know when a they join or leave the cluster. For this purpose, the Backend should provide an implementation of the addMembershipListener method that takes a MembershipListener and notifies it when cluster members join or leave. Hazelcast itself uses the same concept, so the implementation is very straightforward: we only need to map Hazelcast’s events to Collaboration Engine’s MembershipEvent events, which take the MembershipEventType (JOIN or LEAVE) and the identifier of the member.

@Override
public Registration addMembershipListener(
        MembershipListener membershipListener) {
    UUID registrationId = hz.getCluster()
            .addMembershipListener(new InitialMembershipListener() {

                @Override
                public void init(InitialMembershipEvent event) {
                    event.getMembers()
                            .forEach(member -> submitEvent(
                                    MembershipEventType.JOIN,
                                    member.getUuid()));
                }

                @Override
                public void memberAdded(MembershipEvent membershipEvent) {
                    submitEvent(MembershipEventType.JOIN,
                            membershipEvent.getMember().getUuid());
                }

                @Override
                public void memberRemoved(MembershipEvent membershipEvent) {
                    submitEvent(MembershipEventType.LEAVE,
                            membershipEvent.getMember().getUuid());
                }

                private void submitEvent(MembershipEventType type,
                        UUID id) {
                    membershipListener.handleMembershipEvent(
                            new com.vaadin.collaborationengine.MembershipEvent(
                                    type, id, getCollaborationEngine()));
                }
            });
    return () -> hz.getCluster().removeMembershipListener(registrationId);
}
Note
Returning a registration object
The addMembershipListener should return a Registration object that can be used later to remove the listener.

Feature Flag

In order to use the Backend API, it must be explicitly enabled with a feature flag:

  1. Create a src/main/resources/vaadin-featureflags.properties file in your application folder

  2. Add the following content: com.vaadin.experimental.collaborationEngineBackend=true

  3. Restart the application.

AB472607-53E3-481D-AF99-93E3F6ED8B61