Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
balazskreith committed Sep 28, 2024
1 parent cbc1bc4 commit 2749532
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 7 deletions.
30 changes: 25 additions & 5 deletions src/Hamok.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1117,11 +1117,16 @@ export class Hamok<AppData extends Record<string, unknown> = Record<string, unkn
logger.trace('%s Received join notification from itself %o', this.localPeerId, notification);
break;
}
if (this.raft.remotePeers.has(notification.sourcePeerId)) {
logger.trace('%s Received join notification from %s, but it is already in the remote peers', this.localPeerId, notification.sourcePeerId);
break;
}

this.addRemotePeerId(notification.sourcePeerId);

if (this.raft.leaderId === this.localPeerId) {
this._sendEndpointNotification(notification.sourcePeerId, undefined);
}
this.addRemotePeerId(notification.sourcePeerId);

break;
}
case HamokMessageType.ENDPOINT_STATES_NOTIFICATION: {
Expand All @@ -1147,10 +1152,15 @@ export class Hamok<AppData extends Record<string, unknown> = Record<string, unkn
this.removeRemotePeerId(peerId);
}

let foundLocalPeerId = false;

for (const peerId of endpointStateNotification.activeEndpointIds ?? []) {
if (this.remotePeerIds.has(peerId)) continue;
if (peerId === this.localPeerId) continue;

if (peerId === this.localPeerId) {
foundLocalPeerId = true;
continue;
}

logger.debug('%s Received endpoint state notification from %s (supposed to be the leader), and in that it has %s in its active endpoints, therefore we need to add it',
this.localPeerId,
endpointStateNotification.sourceEndpointId,
Expand All @@ -1164,6 +1174,16 @@ export class Hamok<AppData extends Record<string, unknown> = Record<string, unkn
this.addRemotePeerId(endpointStateNotification.sourceEndpointId);
}

if (!foundLocalPeerId) {
// we need to add the local peer id to the active endpoints of the leader

const joinMsg = this._codec.encodeJoinNotification(new JoinNotification(this.localPeerId, endpointStateNotification.sourceEndpointId));

this._emitMessage(joinMsg, endpointStateNotification.sourceEndpointId);

break;
}

// we add 2 becasue the nextIndex of the leader has not been reserved, and
const possibleLowestIndex = endpointStateNotification.leaderNextIndex - endpointStateNotification.numberOfLogs + 2;

Expand All @@ -1173,7 +1193,7 @@ export class Hamok<AppData extends Record<string, unknown> = Record<string, unkn
) {
// we make a warn message only if it is not the first join

logger.warn('%s Commit index of this peer (%d) is lower than the smallest commit index (%s) from remote peers resetting the logs',
logger.warn('%s Commit index of this peer (%d) is lower than the smallest commit index (%d) from remote peers resetting the logs',
this.localPeerId,
this.raft.logs.commitIndex,
possibleLowestIndex
Expand Down
6 changes: 5 additions & 1 deletion src/collections/HamokConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,11 @@ export class HamokConnection<K, V> extends EventEmitter {
// message.type === HamokMessageType.INSERT_ENTRIES_REQUEST ? this.codec.valueCodec.decode(message.values[0]) : -1
// );
if (commitIndex <= this._appliedCommitIndex) {
return logger.warn('Received message with commit index %d is older or equal than the last applied commit index %d', commitIndex, this._appliedCommitIndex);
return logger.warn('Connection for id %s Received message with commit index %d is older or equal than the last applied commit index %d',
this.config.storageId,
commitIndex,
this._appliedCommitIndex
);
}
// only in test purposes
// if (this._appliedCommitIndex + 1 !== commitIndex) {
Expand Down
2 changes: 1 addition & 1 deletion src/raft/MemoryStoredRaftLogs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ export class MemoryStoredRaftLogs extends EventEmitter implements RaftLogs {
this._firstIndex = newCommitIndex;
this._memoryEstimateBytesLength = 0;

logger.debug(`Logs are reset. new values: commitIndex: ${this._commitIndex}, nextIndex: ${this._nextIndex}, lastApplied: ${this._firstIndex}`);
logger.warn(`Logs are reset. new values: commitIndex: ${this._commitIndex}, nextIndex: ${this._nextIndex}, lastApplied: ${this._firstIndex}`);
}

public removeUntil(newFirstIndex: number): void {
Expand Down

0 comments on commit 2749532

Please sign in to comment.