Skip to content

Commit

Permalink
Merge pull request #614 from ripienaar/msg_ttl
Browse files Browse the repository at this point in the history
Update msg ttl for latest server code
  • Loading branch information
ripienaar authored Jan 21, 2025
2 parents 9085607 + 1189ef1 commit 0e9a68a
Show file tree
Hide file tree
Showing 21 changed files with 138 additions and 91 deletions.
6 changes: 4 additions & 2 deletions api/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,8 +583,10 @@ type StreamConfig struct {
// AllowMsgTTL allows header initiated per-message TTLs. If disabled,
// then the `NATS-TTL` header will be ignored.
AllowMsgTTL bool `json:"allow_msg_ttl,omitempty" yaml:"allow_msg_ttl"`
// LimitsTTL activates writing of messages when limits are applied with a specific TTL.
LimitsTTL time.Duration `json:"limits_ttl,omitempty" yaml:"limits_ttl"`
// Enables placing markers in the stream for certain message delete operations
SubjectDeleteMarkers bool `json:"subject_delete_markers,omitempty"`
// When placing a marker, how long should it be valid, defaults to 15m
SubjectDeleteMarkerTTL string `json:"subject_delete_marker_ttl,omitempty"`
// The following defaults will apply to consumers when created against
// this stream, unless overridden manually. They also represent the maximum values that
// these properties may have
Expand Down
11 changes: 8 additions & 3 deletions schema_source/jetstream/api/v1/definitions.json
Original file line number Diff line number Diff line change
Expand Up @@ -1210,9 +1210,14 @@
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$ref": "#/definitions/golang_duration_nanos"
"subject_delete_markers": {
"description": "Enables placing markers in the stream for certain message delete operations",
"type": "boolean",
"default": false
},
"subject_delete_marker_ttl": {
"description": "When placing a marker, how long should it be valid, defaults to 15m",
"type": "string"
}
}
},
Expand Down
2 changes: 1 addition & 1 deletion schemas/jetstream/api/v1/consumer_configuration.json
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@
"maximum": 9223372036854775807
},
"max_deliver": {
"description": "The number of times a message will be redelivered to consumers if not acknowledged in time",
"description": "The number of times a message will be delivered to consumers if not acknowledged in time",
"default": -1,
"$comment": "integer with a dynamic bit size depending on the platform the cluster runs on, can be up to 64bit",
"type": "integer",
Expand Down
2 changes: 1 addition & 1 deletion schemas/jetstream/api/v1/consumer_create_request.json
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@
"maximum": 9223372036854775807
},
"max_deliver": {
"description": "The number of times a message will be redelivered to consumers if not acknowledged in time",
"description": "The number of times a message will be delivered to consumers if not acknowledged in time",
"default": -1,
"$comment": "integer with a dynamic bit size depending on the platform the cluster runs on, can be up to 64bit",
"type": "integer",
Expand Down
2 changes: 1 addition & 1 deletion schemas/jetstream/api/v1/consumer_create_response.json
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@
"maximum": 9223372036854775807
},
"max_deliver": {
"description": "The number of times a message will be redelivered to consumers if not acknowledged in time",
"description": "The number of times a message will be delivered to consumers if not acknowledged in time",
"default": -1,
"$comment": "integer with a dynamic bit size depending on the platform the cluster runs on, can be up to 64bit",
"type": "integer",
Expand Down
2 changes: 1 addition & 1 deletion schemas/jetstream/api/v1/consumer_info_response.json
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@
"maximum": 9223372036854775807
},
"max_deliver": {
"description": "The number of times a message will be redelivered to consumers if not acknowledged in time",
"description": "The number of times a message will be delivered to consumers if not acknowledged in time",
"default": -1,
"$comment": "integer with a dynamic bit size depending on the platform the cluster runs on, can be up to 64bit",
"type": "integer",
Expand Down
2 changes: 1 addition & 1 deletion schemas/jetstream/api/v1/consumer_list_response.json
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@
"maximum": 9223372036854775807
},
"max_deliver": {
"description": "The number of times a message will be redelivered to consumers if not acknowledged in time",
"description": "The number of times a message will be delivered to consumers if not acknowledged in time",
"default": -1,
"$comment": "integer with a dynamic bit size depending on the platform the cluster runs on, can be up to 64bit",
"type": "integer",
Expand Down
14 changes: 8 additions & 6 deletions schemas/jetstream/api/v1/stream_configuration.json
Original file line number Diff line number Diff line change
Expand Up @@ -434,12 +434,14 @@
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
"subject_delete_markers": {
"description": "Enables placing markers in the stream for certain message delete operations",
"type": "boolean",
"default": false
},
"subject_delete_marker_ttl": {
"description": "When placing a marker, how long should it be valid, defaults to 15m",
"type": "string"
}
}
}
14 changes: 8 additions & 6 deletions schemas/jetstream/api/v1/stream_create_request.json
Original file line number Diff line number Diff line change
Expand Up @@ -437,12 +437,14 @@
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
"subject_delete_markers": {
"description": "Enables placing markers in the stream for certain message delete operations",
"type": "boolean",
"default": false
},
"subject_delete_marker_ttl": {
"description": "When placing a marker, how long should it be valid, defaults to 15m",
"type": "string"
}
}
},
Expand Down
14 changes: 8 additions & 6 deletions schemas/jetstream/api/v1/stream_create_response.json
Original file line number Diff line number Diff line change
Expand Up @@ -452,12 +452,14 @@
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
"subject_delete_markers": {
"description": "Enables placing markers in the stream for certain message delete operations",
"type": "boolean",
"default": false
},
"subject_delete_marker_ttl": {
"description": "When placing a marker, how long should it be valid, defaults to 15m",
"type": "string"
}
}
}
Expand Down
14 changes: 8 additions & 6 deletions schemas/jetstream/api/v1/stream_info_response.json
Original file line number Diff line number Diff line change
Expand Up @@ -452,12 +452,14 @@
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
"subject_delete_markers": {
"description": "Enables placing markers in the stream for certain message delete operations",
"type": "boolean",
"default": false
},
"subject_delete_marker_ttl": {
"description": "When placing a marker, how long should it be valid, defaults to 15m",
"type": "string"
}
}
}
Expand Down
14 changes: 8 additions & 6 deletions schemas/jetstream/api/v1/stream_list_response.json
Original file line number Diff line number Diff line change
Expand Up @@ -517,12 +517,14 @@
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
"subject_delete_markers": {
"description": "Enables placing markers in the stream for certain message delete operations",
"type": "boolean",
"default": false
},
"subject_delete_marker_ttl": {
"description": "When placing a marker, how long should it be valid, defaults to 15m",
"type": "string"
}
}
}
Expand Down
14 changes: 8 additions & 6 deletions schemas/jetstream/api/v1/stream_restore_request.json
Original file line number Diff line number Diff line change
Expand Up @@ -441,12 +441,14 @@
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
"subject_delete_markers": {
"description": "Enables placing markers in the stream for certain message delete operations",
"type": "boolean",
"default": false
},
"subject_delete_marker_ttl": {
"description": "When placing a marker, how long should it be valid, defaults to 15m",
"type": "string"
}
}
},
Expand Down
14 changes: 8 additions & 6 deletions schemas/jetstream/api/v1/stream_snapshot_response.json
Original file line number Diff line number Diff line change
Expand Up @@ -476,12 +476,14 @@
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
"subject_delete_markers": {
"description": "Enables placing markers in the stream for certain message delete operations",
"type": "boolean",
"default": false
},
"subject_delete_marker_ttl": {
"description": "When placing a marker, how long should it be valid, defaults to 15m",
"type": "string"
}
}
},
Expand Down
14 changes: 8 additions & 6 deletions schemas/jetstream/api/v1/stream_template_configuration.json
Original file line number Diff line number Diff line change
Expand Up @@ -457,12 +457,14 @@
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
"subject_delete_markers": {
"description": "Enables placing markers in the stream for certain message delete operations",
"type": "boolean",
"default": false
},
"subject_delete_marker_ttl": {
"description": "When placing a marker, how long should it be valid, defaults to 15m",
"type": "string"
}
}
}
Expand Down
14 changes: 8 additions & 6 deletions schemas/jetstream/api/v1/stream_template_create_request.json
Original file line number Diff line number Diff line change
Expand Up @@ -451,12 +451,14 @@
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
"subject_delete_markers": {
"description": "Enables placing markers in the stream for certain message delete operations",
"type": "boolean",
"default": false
},
"subject_delete_marker_ttl": {
"description": "When placing a marker, how long should it be valid, defaults to 15m",
"type": "string"
}
}
},
Expand Down
14 changes: 8 additions & 6 deletions schemas/jetstream/api/v1/stream_template_create_response.json
Original file line number Diff line number Diff line change
Expand Up @@ -461,12 +461,14 @@
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
"subject_delete_markers": {
"description": "Enables placing markers in the stream for certain message delete operations",
"type": "boolean",
"default": false
},
"subject_delete_marker_ttl": {
"description": "When placing a marker, how long should it be valid, defaults to 15m",
"type": "string"
}
}
},
Expand Down
14 changes: 8 additions & 6 deletions schemas/jetstream/api/v1/stream_template_info_response.json
Original file line number Diff line number Diff line change
Expand Up @@ -461,12 +461,14 @@
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
"subject_delete_markers": {
"description": "Enables placing markers in the stream for certain message delete operations",
"type": "boolean",
"default": false
},
"subject_delete_marker_ttl": {
"description": "When placing a marker, how long should it be valid, defaults to 15m",
"type": "string"
}
}
},
Expand Down
14 changes: 8 additions & 6 deletions schemas/jetstream/api/v1/stream_update_request.json
Original file line number Diff line number Diff line change
Expand Up @@ -437,12 +437,14 @@
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
"subject_delete_markers": {
"description": "Enables placing markers in the stream for certain message delete operations",
"type": "boolean",
"default": false
},
"subject_delete_marker_ttl": {
"description": "When placing a marker, how long should it be valid, defaults to 15m",
"type": "string"
}
}
}
Expand Down
14 changes: 8 additions & 6 deletions schemas/jetstream/api/v1/stream_update_response.json
Original file line number Diff line number Diff line change
Expand Up @@ -452,12 +452,14 @@
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
"subject_delete_markers": {
"description": "Enables placing markers in the stream for certain message delete operations",
"type": "boolean",
"default": false
},
"subject_delete_marker_ttl": {
"description": "When placing a marker, how long should it be valid, defaults to 15m",
"type": "string"
}
}
}
Expand Down
20 changes: 17 additions & 3 deletions streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,10 +545,23 @@ func AllowMsgTTL() StreamOption {
}
}

func LimitsAppliedTTL(ttl time.Duration) StreamOption {
func AllowSubjectDeleteMarkers() StreamOption {
return func(o *api.StreamConfig) error {
o.AllowMsgTTL = true
o.LimitsTTL = ttl
o.SubjectDeleteMarkers = true
return nil
}
}

func SubjectDeleteMarkerTTL(d string) StreamOption {
return func(o *api.StreamConfig) error {
err := AllowSubjectDeleteMarkers()(o)
if err != nil {
return err
}

o.SubjectDeleteMarkerTTL = d

return nil
}
}
Expand Down Expand Up @@ -1132,5 +1145,6 @@ func (s *Stream) Metadata() map[string]string { return s.cfg.Metada
func (s *Stream) Compression() api.Compression { return s.cfg.Compression }
func (s *Stream) FirstSequence() uint64 { return s.cfg.FirstSeq }
func (s *Stream) AllowMsgTTL() bool { return s.cfg.AllowMsgTTL }
func (s *Stream) LimitsTTL() time.Duration { return s.cfg.LimitsTTL }
func (s *Stream) SubjectDeleteMarkers() bool { return s.cfg.SubjectDeleteMarkers }
func (s *Stream) SubjectDeleteMarkerTTL() string { return s.cfg.SubjectDeleteMarkerTTL }
func (s *Stream) ConsumerLimits() api.StreamConsumerLimits { return s.cfg.ConsumerLimits }

0 comments on commit 0e9a68a

Please sign in to comment.