Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add initial filter classes #159

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.flipkart.varadhi.consumer.filtering;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@RequiredArgsConstructor
public class FilterProvider {


public static MessageFilter get() {
return new MessageFilter();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.flipkart.varadhi.consumer.filtering;

import com.flipkart.varadhi.entities.Message;

import java.util.function.Predicate;

public class MessageFilter implements Predicate<Message> {
@Override
public boolean test(Message message) {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.flipkart.varadhi.entities;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.flipkart.varadhi.entities.filter.FilterPolicy;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import lombok.Getter;
Expand Down Expand Up @@ -31,6 +32,8 @@ public class SubscriptionResource extends VersionedEntity implements Validatable
@NotNull
ConsumptionPolicy consumptionPolicy;

FilterPolicy filterPolicy;

private SubscriptionResource(
String name,
int version,
Expand All @@ -41,7 +44,8 @@ private SubscriptionResource(
boolean grouped,
Endpoint endpoint,
RetryPolicy retryPolicy,
ConsumptionPolicy consumptionPolicy
ConsumptionPolicy consumptionPolicy,
FilterPolicy filterPolicy
) {
super(name, version);
this.project = project;
Expand All @@ -52,6 +56,7 @@ private SubscriptionResource(
this.endpoint = endpoint;
this.retryPolicy = retryPolicy;
this.consumptionPolicy = consumptionPolicy;
this.filterPolicy = filterPolicy;
}

public static SubscriptionResource of(
Expand All @@ -63,11 +68,12 @@ public static SubscriptionResource of(
boolean grouped,
Endpoint endpoint,
RetryPolicy retryPolicy,
ConsumptionPolicy consumptionPolicy
ConsumptionPolicy consumptionPolicy,
FilterPolicy filterPolicy
) {
return new SubscriptionResource(
name, INITIAL_VERSION, project, topic, topicProject, description, grouped, endpoint, retryPolicy,
consumptionPolicy
consumptionPolicy, filterPolicy
);
}

Expand All @@ -93,7 +99,8 @@ public static SubscriptionResource from(VaradhiSubscription subscription) {
subscription.isGrouped(),
subscription.getEndpoint(),
subscription.getRetryPolicy(),
subscription.getConsumptionPolicy()
subscription.getConsumptionPolicy(),
subscription.getFilterPolicy()
);
subResource.setVersion(subscription.getVersion());
return subResource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.flipkart.varadhi.entities.filter.FilterPolicy;
import lombok.*;

@Getter
Expand All @@ -17,6 +18,7 @@ public class VaradhiSubscription extends MetaStoreEntity {
private ConsumptionPolicy consumptionPolicy;
private SubscriptionShards shards;
private Status status;
private FilterPolicy filterPolicy;


private VaradhiSubscription(
Expand All @@ -30,7 +32,8 @@ private VaradhiSubscription(
RetryPolicy retryPolicy,
ConsumptionPolicy consumptionPolicy,
SubscriptionShards shards,
Status status
Status status,
FilterPolicy filterPolicy
) {
super(name, version);
this.project = project;
Expand All @@ -45,6 +48,9 @@ private VaradhiSubscription(
}
this.shards = shards;
this.status = status;

// TODO(aayush): parse and validate filter policy
this.filterPolicy = filterPolicy;
}

public static VaradhiSubscription of(
Expand All @@ -56,11 +62,12 @@ public static VaradhiSubscription of(
Endpoint endpoint,
RetryPolicy retryPolicy,
ConsumptionPolicy consumptionPolicy,
SubscriptionShards shards
SubscriptionShards shards,
FilterPolicy filterPolicy
) {
return new VaradhiSubscription(
name, INITIAL_VERSION, project, topic, description, grouped, endpoint, retryPolicy, consumptionPolicy,
shards, new Status(State.Creating)
shards, new Status(State.Creating), filterPolicy
);
}

Expand Down Expand Up @@ -100,6 +107,7 @@ public enum State {
public static class Status {
String message;
State state;

public Status(State state) {
this.state = state;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.flipkart.varadhi.entities.filter;

import lombok.Data;

import java.util.List;

@Data
public class FilterExpression {
private final FilterOperation.BooleanOps op;
private final List<FilterExpression> exp;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.flipkart.varadhi.entities.filter;

public enum FilterOperation {
startsWith,
endsWith,
contains,
in,
exists;

public enum BooleanOps {
AND, OR, NOT;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.flipkart.varadhi.entities.filter;

import lombok.Data;

@Data
public class FilterPolicy {

/**
* Expression struct that describes the various conditions and subexpressions joined by binary or unary operators.
*/
private final FilterExpression filterExpression;
}
1 change: 1 addition & 0 deletions entities/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@
exports com.flipkart.varadhi.entities;
exports com.flipkart.varadhi.entities.cluster;
exports com.flipkart.varadhi.entities.auth;
exports com.flipkart.varadhi.entities.filter;
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ public VaradhiSubscription build(String name, String subProject, String subscrib
endpoint == null ? getHttpEndpoint() : endpoint,
retryPolicy == null ? getRetryPolicy() : retryPolicy,
consumptionPolicy == null ? getConsumptionPolicy() : consumptionPolicy,
shards
shards,
null //TODO(aayush): provide actual filter policy
);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.flipkart.varadhi.utils;

import com.flipkart.varadhi.entities.InternalQueueCategory;
import com.flipkart.varadhi.entities.*;
import com.flipkart.varadhi.spi.services.StorageSubscriptionFactory;
import com.flipkart.varadhi.spi.services.StorageTopicFactory;
Expand Down Expand Up @@ -49,7 +48,8 @@ public VaradhiSubscription get(SubscriptionResource subscriptionResource, Projec
subscriptionResource.getEndpoint(),
subscriptionResource.getRetryPolicy(),
subscriptionResource.getConsumptionPolicy(),
shards
shards,
subscriptionResource.getFilterPolicy()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import java.net.MalformedURLException;
import java.net.URI;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -141,7 +140,8 @@ void testSubscriptionEntitiesSerDe() {
false,
endpoint,
retryPolicy,
consumptionPolicy
consumptionPolicy,
null
);

VaradhiSubscriptionFactory factory = new VaradhiSubscriptionFactory(pts, psf, ptf, region);
Expand Down Expand Up @@ -171,7 +171,9 @@ void getSubscriptionListReturnsCorrectSubscriptions() {
subscriptionService.createSubscription(unGroupedTopic, sub1, o1t1p1);
subscriptionService.createSubscription(unGroupedTopic, sub2, o1t1p1);
subscriptionService.createSubscription(
unGroupedTopic, SubscriptionHandlersTest.getUngroupedSubscription("sub3", o1t1p2, unGroupedTopic), o1t1p2);
unGroupedTopic, SubscriptionHandlersTest.getUngroupedSubscription("sub3", o1t1p2, unGroupedTopic),
o1t1p2
);

List<String> actualSubscriptions = subscriptionService.getSubscriptionList(o1t1p1.getName());

Expand Down Expand Up @@ -263,7 +265,8 @@ void updateSubscriptionUpdatesCorrectly(VertxTestContext ctx) {
doReturn(unGroupedTopic).when(varadhiMetaStore).getTopic(unGroupedTopic.getName());
subscriptionService.createSubscription(unGroupedTopic, sub1, o1t1p1);
VaradhiSubscription update =
SubscriptionHandlersTest.getUngroupedSubscription(sub1.getName().split(NAME_SEPARATOR_REGEX)[1], o1t1p1, unGroupedTopic);
SubscriptionHandlersTest.getUngroupedSubscription(
sub1.getName().split(NAME_SEPARATOR_REGEX)[1], o1t1p1, unGroupedTopic);
update.setVersion(1);
CompletableFuture<SubscriptionStatus> status =
CompletableFuture.completedFuture(new SubscriptionStatus(update.getName(), SubscriptionState.STOPPED));
Expand All @@ -284,7 +287,8 @@ void updateSubscriptionWithVersionConflictThrows(VertxTestContext ctx) {
doReturn(unGroupedTopic).when(varadhiMetaStore).getTopic(unGroupedTopic.getName());
subscriptionService.createSubscription(unGroupedTopic, sub1, o1t1p1);
VaradhiSubscription update =
SubscriptionHandlersTest.getUngroupedSubscription(sub1.getName().split(NAME_SEPARATOR_REGEX)[1], o1t1p1, unGroupedTopic);
SubscriptionHandlersTest.getUngroupedSubscription(
sub1.getName().split(NAME_SEPARATOR_REGEX)[1], o1t1p1, unGroupedTopic);
update.setVersion(2);
CompletableFuture<SubscriptionStatus> status =
CompletableFuture.completedFuture(new SubscriptionStatus(update.getName(), SubscriptionState.STOPPED));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package com.flipkart.varadhi.web.admin;

import com.flipkart.varadhi.entities.*;
import com.flipkart.varadhi.entities.filter.FilterExpression;
import com.flipkart.varadhi.entities.filter.FilterOperation;
import com.flipkart.varadhi.entities.filter.FilterPolicy;
import com.flipkart.varadhi.exceptions.ResourceNotFoundException;
import com.flipkart.varadhi.services.ProjectService;
import com.flipkart.varadhi.services.SubscriptionService;
Expand Down Expand Up @@ -38,6 +41,8 @@ public class SubscriptionHandlersTest extends WebTestBase {
private static final ConsumptionPolicy consumptionPolicy = new ConsumptionPolicy(1, 1, false, 1, null);
private static final TopicCapacityPolicy capacityPolicy = new TopicCapacityPolicy(1, 10, 1);
private static final SubscriptionShards shards = new SubscriptionUnitShard(0, capacityPolicy, null, null, null);
private static final FilterPolicy filterPolicy =
new FilterPolicy(new FilterExpression(FilterOperation.BooleanOps.AND, List.of()));

private final Project project = Project.of("project1", "", "team1", "org1");
private final TopicResource topicResource = TopicResource.unGrouped("topic1", "project2", null);
Expand Down Expand Up @@ -71,7 +76,8 @@ private static VaradhiSubscription getSubscription(
endpoint,
retryPolicy,
consumptionPolicy,
shards
shards,
null
);
}

Expand Down Expand Up @@ -268,7 +274,8 @@ private SubscriptionResource getSubscriptionResource(
false,
endpoint,
retryPolicy,
consumptionPolicy
consumptionPolicy,
null
);
}

Expand Down
Loading