Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor worker message passing #42353

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,70 +67,74 @@ public Object receiveDataMultipleChannels(Strand strand, ReceiveField[] receiveF
}
for (ReceiveField field : receiveFields) {
WorkerDataChannel channel = getWorkerDataChannel(field.channelName());
if (!channel.isClosed()) {
Object result = channel.tryTakeData(strand, true);
checkAndPopulateResult(strand, field, result, channel);
} else {
if (channel.getState() == WorkerDataChannel.State.AUTO_CLOSED) {
checkAndPopulateResult(strand, field, ErrorUtils.createNoMessageError(field.channelName()),
channel);
}
WorkerDataChannel.State state = channel.getState();
Object result = null;
switch (state) {
case OPEN:
result = channel.tryTakeData(strand, true);
break;
case AUTO_CLOSED:
result = ErrorUtils.createNoMessageError(field.channelName());
break;
case CLOSED:
continue;
}
checkAndPopulateResult(strand, field, result, channel);
}
return clearResultCache(strand, receiveFields);
}

private void checkAndPopulateResult(Strand strand, ReceiveField field, Object result, WorkerDataChannel channel) {
if (result != null) {
result = getResultValue(result);
strand.workerReceiveMap.populateInitialValue(StringUtils.fromString(field.fieldName()), result);
channel.close();
++strand.channelCount;
} else {
if (result == null) {
strand.setState(BLOCK_AND_YIELD);
return;
}
result = getResultValue(result);
strand.workerReceiveMap.populateInitialValue(StringUtils.fromString(field.fieldName()), result);
channel.close();
++strand.channelCount;
}

private Object clearResultCache(Strand strand, ReceiveField[] receiveFields) {
if (strand.channelCount == receiveFields.length) {
BMap<BString, Object> map = strand.workerReceiveMap;
strand.workerReceiveMap = null;
strand.channelCount = 0;
strand.setState(State.RUNNABLE);
return map;
if (strand.channelCount != receiveFields.length) {
return null;
}
return null;
BMap<BString, Object> map = strand.workerReceiveMap;
strand.workerReceiveMap = null;
strand.channelCount = 0;
strand.setState(State.RUNNABLE);
return map;
}

public Object receiveDataAlternateChannels(Strand strand, String[] channels) throws Throwable {
Object result = null;
boolean allChannelsClosed = true;
for (String channelName : channels) {
WorkerDataChannel channel = getWorkerDataChannel(channelName);
if (!channel.isClosed()) {
WorkerDataChannel.State state = channel.getState();
if (state == WorkerDataChannel.State.OPEN) {
allChannelsClosed = false;
result = channel.tryTakeData(strand, true);
if (result != null) {
result = handleNonNullResult(channels, result, channel);
}
} else {
if (channel.getState() == WorkerDataChannel.State.AUTO_CLOSED) {
errors.add((ErrorValue) ErrorUtils.createNoMessageError(channelName));
}
result = handleResultForOpenChannel(strand, channels, channel);
} else if (state == WorkerDataChannel.State.AUTO_CLOSED) {
errors.add((ErrorValue) ErrorUtils.createNoMessageError(channelName));
}
}
return processResulAndError(strand, channels, result, allChannelsClosed);
}

private Object handleNonNullResult(String[] channels, Object result, WorkerDataChannel channel) {
private Object handleResultForOpenChannel(Strand strand, String[] channels, WorkerDataChannel channel)
throws Throwable {
Object result = channel.tryTakeData(strand, true);
if (result == null) {
return null;
}
Object resultValue = getResultValue(result);
if (resultValue instanceof ErrorValue errorValue) {
errors.add(errorValue);
channel.close();
result = null;
} else {
closeChannels(channels);
return null;
}
closeChannels(channels);
return result;
}

Expand Down Expand Up @@ -158,14 +162,15 @@ private void closeChannels(String[] channels) {
for (String channelName : channels) {
WorkerDataChannel channel = getWorkerDataChannel(channelName);
channel.close();
channel.setRemovable();
channel.callCount = 2;
}
}

public synchronized void removeCompletedChannels(Strand strand, String channelName) {
if (this.wDChannels != null) {
WorkerDataChannel channel = this.wDChannels.get(channelName);
if (channel != null && (channel.callCount == 2 || channel.isRemovable)) {
// callCount is incremented to 2 when the message passing is completed.
if (channel != null && channel.callCount == 2) {
this.wDChannels.remove(channelName);
strand.channelDetails.remove(new ChannelDetails(channelName, true, false));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ public class WorkerDataChannel {

protected String chnlName;
protected int callCount = 0;
protected boolean isRemovable;

@SuppressWarnings("rawtypes")
private final Queue<WorkerResult> channel = new LinkedList<>();
Expand Down Expand Up @@ -95,10 +94,6 @@ public State getState() {
return this.state;
}

public void setRemovable() {
this.isRemovable = true;
}

public enum State {
OPEN, AUTO_CLOSED, CLOSED
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ public class ProgramFileConstants {

public static final int MAGIC_NUMBER = 0xBA1DA4CE;
public static final short VERSION_NUMBER = 50;
public static final int BIR_VERSION_NUMBER = 71;
public static final short MIN_SUPPORTED_VERSION = 71;
public static final short MAX_SUPPORTED_VERSION = 71;
public static final int BIR_VERSION_NUMBER = 72;
public static final short MIN_SUPPORTED_VERSION = 72;
public static final short MAX_SUPPORTED_VERSION = 72;

// todo move this to a proper place
public static final String[] SUPPORTED_PLATFORMS = {"java17", "java11"};
Expand Down