Skip to content

Commit

Permalink
Merge pull request #42353 from HindujaB/worker_new
Browse files Browse the repository at this point in the history
Refactor worker message passing
  • Loading branch information
lochana-chathura authored Mar 20, 2024
2 parents 7185566 + cc01a89 commit 4eb341f
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,70 +67,74 @@ public Object receiveDataMultipleChannels(Strand strand, ReceiveField[] receiveF
}
for (ReceiveField field : receiveFields) {
WorkerDataChannel channel = getWorkerDataChannel(field.channelName());
if (!channel.isClosed()) {
Object result = channel.tryTakeData(strand, true);
checkAndPopulateResult(strand, field, result, channel);
} else {
if (channel.getState() == WorkerDataChannel.State.AUTO_CLOSED) {
checkAndPopulateResult(strand, field, ErrorUtils.createNoMessageError(field.channelName()),
channel);
}
WorkerDataChannel.State state = channel.getState();
Object result = null;
switch (state) {
case OPEN:
result = channel.tryTakeData(strand, true);
break;
case AUTO_CLOSED:
result = ErrorUtils.createNoMessageError(field.channelName());
break;
case CLOSED:
continue;
}
checkAndPopulateResult(strand, field, result, channel);
}
return clearResultCache(strand, receiveFields);
}

private void checkAndPopulateResult(Strand strand, ReceiveField field, Object result, WorkerDataChannel channel) {
if (result != null) {
result = getResultValue(result);
strand.workerReceiveMap.populateInitialValue(StringUtils.fromString(field.fieldName()), result);
channel.close();
++strand.channelCount;
} else {
if (result == null) {
strand.setState(BLOCK_AND_YIELD);
return;
}
result = getResultValue(result);
strand.workerReceiveMap.populateInitialValue(StringUtils.fromString(field.fieldName()), result);
channel.close();
++strand.channelCount;
}

private Object clearResultCache(Strand strand, ReceiveField[] receiveFields) {
if (strand.channelCount == receiveFields.length) {
BMap<BString, Object> map = strand.workerReceiveMap;
strand.workerReceiveMap = null;
strand.channelCount = 0;
strand.setState(State.RUNNABLE);
return map;
if (strand.channelCount != receiveFields.length) {
return null;
}
return null;
BMap<BString, Object> map = strand.workerReceiveMap;
strand.workerReceiveMap = null;
strand.channelCount = 0;
strand.setState(State.RUNNABLE);
return map;
}

public Object receiveDataAlternateChannels(Strand strand, String[] channels) throws Throwable {
Object result = null;
boolean allChannelsClosed = true;
for (String channelName : channels) {
WorkerDataChannel channel = getWorkerDataChannel(channelName);
if (!channel.isClosed()) {
WorkerDataChannel.State state = channel.getState();
if (state == WorkerDataChannel.State.OPEN) {
allChannelsClosed = false;
result = channel.tryTakeData(strand, true);
if (result != null) {
result = handleNonNullResult(channels, result, channel);
}
} else {
if (channel.getState() == WorkerDataChannel.State.AUTO_CLOSED) {
errors.add((ErrorValue) ErrorUtils.createNoMessageError(channelName));
}
result = handleResultForOpenChannel(strand, channels, channel);
} else if (state == WorkerDataChannel.State.AUTO_CLOSED) {
errors.add((ErrorValue) ErrorUtils.createNoMessageError(channelName));
}
}
return processResulAndError(strand, channels, result, allChannelsClosed);
}

private Object handleNonNullResult(String[] channels, Object result, WorkerDataChannel channel) {
private Object handleResultForOpenChannel(Strand strand, String[] channels, WorkerDataChannel channel)
throws Throwable {
Object result = channel.tryTakeData(strand, true);
if (result == null) {
return null;
}
Object resultValue = getResultValue(result);
if (resultValue instanceof ErrorValue errorValue) {
errors.add(errorValue);
channel.close();
result = null;
} else {
closeChannels(channels);
return null;
}
closeChannels(channels);
return result;
}

Expand Down Expand Up @@ -158,14 +162,15 @@ private void closeChannels(String[] channels) {
for (String channelName : channels) {
WorkerDataChannel channel = getWorkerDataChannel(channelName);
channel.close();
channel.setRemovable();
channel.callCount = 2;
}
}

public synchronized void removeCompletedChannels(Strand strand, String channelName) {
if (this.wDChannels != null) {
WorkerDataChannel channel = this.wDChannels.get(channelName);
if (channel != null && (channel.callCount == 2 || channel.isRemovable)) {
// callCount is incremented to 2 when the message passing is completed.
if (channel != null && channel.callCount == 2) {
this.wDChannels.remove(channelName);
strand.channelDetails.remove(new ChannelDetails(channelName, true, false));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ public class WorkerDataChannel {

protected String chnlName;
protected int callCount = 0;
protected boolean isRemovable;

@SuppressWarnings("rawtypes")
private final Queue<WorkerResult> channel = new LinkedList<>();
Expand Down Expand Up @@ -95,10 +94,6 @@ public State getState() {
return this.state;
}

public void setRemovable() {
this.isRemovable = true;
}

public enum State {
OPEN, AUTO_CLOSED, CLOSED
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ public class ProgramFileConstants {

public static final int MAGIC_NUMBER = 0xBA1DA4CE;
public static final short VERSION_NUMBER = 50;
public static final int BIR_VERSION_NUMBER = 71;
public static final short MIN_SUPPORTED_VERSION = 71;
public static final short MAX_SUPPORTED_VERSION = 71;
public static final int BIR_VERSION_NUMBER = 72;
public static final short MIN_SUPPORTED_VERSION = 72;
public static final short MAX_SUPPORTED_VERSION = 72;

// todo move this to a proper place
public static final String[] SUPPORTED_PLATFORMS = {"java17", "java11"};
Expand Down

0 comments on commit 4eb341f

Please sign in to comment.