Chronicle Queue Enterprise replication performs simple uni-directional, multi-way replication by copying a Chronicle queue from one host, to one or more other hosts using TCP/IP.
The queue that we write to is known as the source
queue. The copy(ies) is the sink
queue.
Important
|
Chronicle requires exclusive write access to the sink . The sink should be treated as read-only, and never written to manually.
|
If we allowed messages to be written to the sink, it is easy to imagine a case where different messages are written to a source
and sink
at the same time. The messages on the source
queue and sink
queue could now be in a very different order. Because of this situation we do not allow messages to be written to the sync
queue, Queues must maintain message ordering, and ensure that the source
queue and sink
queue are identical.
One of the reasons that we enforce this is that, in a microservices architecture, in order that you can get true 'replay-ability', you should be able to guarantee the consistency, and ordering, of the messages in queues. Therefore, any service that is running on the source
machine must receive the same events, and in the same order, as the service running on the sink
machine.
Any message that is written to the source
is copied to the sink
. The sink
therefore becomes a mirror image of the source
.
At startup, replication locks the source
queue, and waits for the sink
(s) to report the number of records it has. If the sink
has more records, they are replayed back to source
before it is unlocked and made usable again. This is done to provide automatic data re-synchronisation after a failover to the sink
.
The set of hosts onto which the queue is replicated, is defined as a cluster
. The configuration for a cluster is as follows:
!QueueReplicationCfg {
cluster: {
context: !QueueClusterContext {
heartbeatIntervalMs: 300000,
heartbeatTimeoutMs: 500000,
baseSourcePath: "replica/source",
baseSinkPath: "replica/sink",
}
host1: {
hostId: 1,
connectUri: host.port1,
}
host2: {
hostId: 2,
connectUri: host.port2,
}
...
},
queues: {
queue1: {
path: "queue1",
acknowledge: false,
masterId: 1,
}
}
}
In the configuration shown above, the queue, queue1
, is set to be replicated from host1
(as indicated by masterId
) to all other hosts defined for the cluster.
Queues will use storage paths defined by baseSourcePath
and baseSinkPath
for source
and sink
, respectively, followed by the queue name. For this example, the source
queue will be at replica/source/queue1
, while the sink
will be written to replica/sink/queue1
.
To start replicating data, the user must create an instance of ReplicatedQueue
for each host. This is done as follows:
ReplicatedQueue repl = new ReplicatedQueue(config(), hostId);
repl.startReplication();
...
// shutdown
repl.shutdown();
@Test
public void shouldReplicate() throws Exception {
YamlLogging.setAll(false);
IOTools.deleteDirWithFiles("replica", 10);
TCPRegistry.createServerSocketChannelFor(
"host.port1",
"host.port2",
"host.port3");
startupHost((byte) 1);
startupHost((byte) 2);
ReplicatedQueueCfg qCfg = CONFIG.queues().values().stream().filter(q -> q.path.equals("queue1")).findFirst().orElseGet(() -> {
fail("Queue queue1 not found");
return null;
});
QueueClusterContext ctx = CONFIG.cluster().clusterContext();
assert ctx != null;
SingleChronicleQueue source = QueueBuilderFromConfig.queueFromConfig(qCfg, CONFIG::queueConfig, path -> ctx.getSourcePath(path, (byte) 1));
SingleChronicleQueue sink = QueueBuilderFromConfig.queueFromConfig(qCfg, CONFIG::queueConfig, path -> ctx.getSinkPath(path, (byte) 2));
ExcerptTailer tailer = sink.createTailer();
assertNull(poll(tailer, 200L));
ExcerptAppender appender = source.acquireAppender();
try (DocumentContext dc = appender.writingDocument()) {
dc.wire().write("test").text("Hello replica");
}
String poll = poll(tailer);
assertEquals("Hello replica", poll);
try (DocumentContext dc = appender.writingDocument()) {
dc.wire().write("test").text("Hello replica2");
}
poll = poll(tailer);
assertEquals("Hello replica2", poll);
poll = poll(tailer, 500L);
assertNull(poll);
}
If the Chronicle-Queue is cleared in the primary host, will this be replicated in the secondary?
There’s no such thing as clearing the queue (it’s unsupported). So you can manually delete the files, but that will have unpredictable results if you don’t recreate your queues and don’t restart your process. The queue is append-only, and replication works in append-only manner, that is, it will never ever delete anything. It will continue replication from where it left off which is determined by the entry index, which means, if you delete the files for the previous roll cycles, it will not even notice it. However if you delete the file for the current cycle and then later recreate it without deleting the corresponding file on the other host - you will lose data, as the same index in the primary queue will correspond to old entries in the secondary queue.