Skip to content

Commit

Permalink
Create separate channels for send-receive pair
Browse files Browse the repository at this point in the history
  • Loading branch information
HindujaB committed Nov 21, 2023
1 parent 7fd8965 commit 5409e2e
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1145,8 +1145,6 @@ public void visit(BLangForkJoin forkJoin) {
public void visit(BLangWorkerReceive workerReceive) {
BIRBasicBlock thenBB = new BIRBasicBlock(this.env.nextBBId());
addToTrapStack(thenBB);
String channel = workerReceive.workerIdentifier.value + "->" + env.enclFunc.workerName.value;

BIRVariableDcl tempVarDcl = new BIRVariableDcl(workerReceive.getBType(), this.env.nextLocalVarId(names),
VarScope.FUNCTION, VarKind.TEMP);
this.env.enclFunc.localVars.add(tempVarDcl);
Expand All @@ -1155,8 +1153,8 @@ public void visit(BLangWorkerReceive workerReceive) {

boolean isOnSameStrand = DEFAULT_WORKER_NAME.equals(this.env.enclFunc.workerName.value);

this.env.enclBB.terminator = new BIRTerminator.WorkerReceive(workerReceive.pos, names.fromString(channel),
lhsOp, isOnSameStrand, thenBB, this.currentScope);
this.env.enclBB.terminator = new BIRTerminator.WorkerReceive(workerReceive.pos,
names.fromString(workerReceive.channel), lhsOp, isOnSameStrand, thenBB, this.currentScope);

this.env.enclBasicBlocks.add(thenBB);
this.env.enclBB = thenBB;
Expand All @@ -1174,12 +1172,10 @@ public void visit(BLangWorkerAsyncSendExpr asyncSendExpr) {
this.env.enclFunc.localVars.add(tempVarDcl);
BIROperand lhsOp = new BIROperand(tempVarDcl);
this.env.targetOperand = lhsOp;

String channelName = this.env.enclFunc.workerName.value + "->" + asyncSendExpr.workerIdentifier.value;
boolean isOnSameStrand = DEFAULT_WORKER_NAME.equals(this.env.enclFunc.workerName.value);

this.env.enclBB.terminator = new BIRTerminator.WorkerSend(
asyncSendExpr.pos, names.fromString(channelName), dataOp, isOnSameStrand, false, lhsOp,
asyncSendExpr.pos, names.fromString(asyncSendExpr.channel), dataOp, isOnSameStrand, false, lhsOp,
thenBB, this.currentScope);

this.env.enclBasicBlocks.add(thenBB);
Expand All @@ -1198,12 +1194,10 @@ public void visit(BLangWorkerSyncSendExpr syncSend) {
this.env.enclFunc.localVars.add(tempVarDcl);
BIROperand lhsOp = new BIROperand(tempVarDcl);
this.env.targetOperand = lhsOp;

String channelName = this.env.enclFunc.workerName.value + "->" + syncSend.workerIdentifier.value;
boolean isOnSameStrand = DEFAULT_WORKER_NAME.equals(this.env.enclFunc.workerName.value);

this.env.enclBB.terminator = new BIRTerminator.WorkerSend(
syncSend.pos, names.fromString(channelName), dataOp, isOnSameStrand, true, lhsOp,
syncSend.pos, names.fromString(syncSend.channel), dataOp, isOnSameStrand, true, lhsOp,
thenBB, this.currentScope);

this.env.enclBasicBlocks.add(thenBB);
Expand All @@ -1216,10 +1210,9 @@ public void visit(BLangWorkerFlushExpr flushExpr) {
addToTrapStack(thenBB);

//create channelDetails array
BIRNode.ChannelDetails[] channels = new BIRNode.ChannelDetails[flushExpr.workerIdentifierList.size()];
BIRNode.ChannelDetails[] channels = new BIRNode.ChannelDetails[flushExpr.workerChannels.size()];
int i = 0;
for (BLangIdentifier workerIdentifier : flushExpr.workerIdentifierList) {
String channelName = this.env.enclFunc.workerName.value + "->" + workerIdentifier.value;
for (String channelName : flushExpr.workerChannels) {
boolean isOnSameStrand = DEFAULT_WORKER_NAME.equals(this.env.enclFunc.workerName.value);
channels[i] = new BIRNode.ChannelDetails(channelName, isOnSameStrand, true);
i++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8151,9 +8151,16 @@ public void visit(BLangWorkerReceive workerReceiveNode) {
public void visit(BLangWorkerFlushExpr workerFlushExpr) {
workerFlushExpr.workerIdentifierList = workerFlushExpr.cachedWorkerSendStmts
.stream().map(send -> send.workerIdentifier).distinct().collect(Collectors.toList());
populateWorkerChannelList(workerFlushExpr);
result = workerFlushExpr;
}

private void populateWorkerChannelList(BLangWorkerFlushExpr workerFlushExpr) {
for (BLangWorkerAsyncSendExpr sendStmt : workerFlushExpr.cachedWorkerSendStmts) {
workerFlushExpr.workerChannels.add(sendStmt.channel);
}
}

@Override
public void visit(BLangTransactionalExpr transactionalExpr) {
BInvokableSymbol isTransactionalSymbol =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.ballerinalang.model.tree.OperatorKind;
import org.ballerinalang.model.tree.TopLevelNode;
import org.ballerinalang.model.tree.expressions.RecordLiteralNode;
import org.ballerinalang.model.tree.expressions.WorkerSendExpressionNode;
import org.ballerinalang.model.tree.expressions.XMLNavigationAccess;
import org.ballerinalang.model.tree.statements.StatementNode;
import org.ballerinalang.model.tree.statements.VariableDefinitionNode;
Expand Down Expand Up @@ -3664,14 +3665,20 @@ private void validateWorkerInteractions(WorkerActionSystem workerActionSystem, A
} else {
this.validateWorkerActionParameters((BLangWorkerAsyncSendExpr) currentAction, receive);
}
String channelName = generateChannelName(workerActionSystem, (WorkerSendExpressionNode) currentAction, receive);
receive.channel = channelName;
if (currentAction.getKind() == NodeKind.WORKER_SYNC_SEND) {
((BLangWorkerSyncSendExpr) currentAction).channel = channelName;
} else {
((BLangWorkerAsyncSendExpr) currentAction).channel = channelName;
}
otherSM.next();
data.workerSystemMovementSequence++;
worker.next();
data.workerSystemMovementSequence++;


systemRunning = true;
String channelName = generateChannelName(worker.workerId, otherSM.workerId);
otherSM.node.sendsToThis.add(channelName);

worker.node.sendsToThis.add(channelName);
Expand All @@ -3693,6 +3700,14 @@ private void validateWorkerInteractions(WorkerActionSystem workerActionSystem, A
}
}

private String generateChannelName(WorkerActionSystem workerActionSystem, WorkerSendExpressionNode sendExpr,
BLangWorkerReceive receive) {
String channel = workerActionSystem.createChannel(receive.workerIdentifier.value,
sendExpr.getWorkerName().getValue());
workerActionSystem.addWorkerChannel(sendExpr, receive, channel);
return channel;
}

private boolean validateWorkerInteractionsAfterWaitAction(WorkerActionSystem workerActionSystem) {
boolean isValid = true;
for (WorkerActionStateMachine worker : workerActionSystem.finshedWorkers) {
Expand Down Expand Up @@ -4043,6 +4058,8 @@ private static class WorkerActionSystem {
public List<WorkerActionStateMachine> finshedWorkers = new ArrayList<>();
private Stack<WorkerActionStateMachine> workerActionStateMachines = new Stack<>();
private Map<BLangNode, SymbolEnv> workerInteractionEnvironments = new IdentityHashMap<>();
private Map<String, Integer> channelCounts = new HashMap<>();
private Map<WorkerChannel, String> workerChannels = new HashMap<>();
private boolean hasErrors = false;


Expand Down Expand Up @@ -4075,6 +4092,17 @@ public Location getRootPosition() {
return this.finshedWorkers.iterator().next().pos;
}

public String createChannel(String sender, String receiver) {
String key = sender + "->" + receiver;
int count = channelCounts.getOrDefault(key, 0);
channelCounts.put(key, ++count);
return key + "-" + count;
}

public void addWorkerChannel(WorkerSendExpressionNode sendExpr, BLangWorkerReceive receive, String key) {
this.workerChannels.put(new WorkerChannel(sendExpr, receive), key);
}

@Override
public String toString() {
return this.finshedWorkers.toString();
Expand All @@ -4094,6 +4122,20 @@ private SymbolEnv getActionEnvironment(BLangNode currentAction) {
}
}

/**
* This class represents a worker channel which contains a send and receive action.
*/
private static class WorkerChannel {

public WorkerSendExpressionNode sendAction;
public BLangWorkerReceive receiveAction;

public WorkerChannel(WorkerSendExpressionNode sendAction, BLangWorkerReceive receiveAction) {
this.sendAction = sendAction;
this.receiveAction = receiveAction;
}
}

/**
* This class represents a state machine to maintain the state of the send/receive
* actions of a worker.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class BLangWorkerAsyncSendExpr extends BLangExpression implements WorkerS
public BSymbol workerSymbol;
public BType workerType;
public BType sendType;
public String channel;

@Override
public BLangExpression getExpression() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class BLangWorkerFlushExpr extends BLangExpression implements WorkerFlush
public BSymbol workerSymbol;
public List<BLangIdentifier> workerIdentifierList = new ArrayList<>();
public List<BLangWorkerAsyncSendExpr> cachedWorkerSendStmts = new ArrayList<>();
public List<String> workerChannels = new ArrayList<>();

@Override
public NodeKind getKind() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class BLangWorkerReceive extends BLangExpression implements WorkerReceive
public SymbolEnv env;
public BType workerType;
public BType matchingSendsError;
public String channel;

@Override
public BLangIdentifier getWorkerName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class BLangWorkerSyncSendExpr extends BLangExpression implements WorkerSe
public SymbolEnv env;
public BType workerType;
public BType sendType;
public String channel;

@Override
public NodeKind getKind() {
Expand Down

0 comments on commit 5409e2e

Please sign in to comment.