Skip to content

Commit

Permalink
Merge pull request #31 from agelostsal/push-auth-h
Browse files Browse the repository at this point in the history
ARGO-2686 ams: support header authentication on remote for push messages
  • Loading branch information
kaggis authored Nov 18, 2020
2 parents ece8b63 + a453c0d commit c4f8d11
Show file tree
Hide file tree
Showing 10 changed files with 132 additions and 83 deletions.
79 changes: 45 additions & 34 deletions api/v1/grpc/proto/ams.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions api/v1/grpc/proto/ams.proto
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ message PushConfig {
int64 max_messages = 3;
// Required. Retry policy.
RetryPolicy retry_policy = 2;
// Required. Authorization header that the sent messages should include into the request
string authorization_header = 4;
}

// RetryPolicy holds information regarding the retry policy.
Expand Down
19 changes: 13 additions & 6 deletions api/v1/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (ps *PushService) ActivateSubscription(ctx context.Context, r *amsPb.Activa
c, _ := consumers.New(consumers.AmsHttpConsumerType, r.Subscription.FullName, ps.Cfg, ps.Client)

// choose a sender
s, _ := senders.New(senders.HttpSenderType, r.Subscription.PushConfig.PushEndpoint, ps.Client)
s, _ := senders.New(senders.HttpSenderType, *r.Subscription.PushConfig, ps.Client)

worker, err := push.New(r.Subscription, c, s, ps.deactivateChan)
if err != nil {
Expand Down Expand Up @@ -305,8 +305,9 @@ func (ps *PushService) loadSubscriptions() {
FullName: sub.FullName,
FullTopic: sub.FullTopic,
PushConfig: &amsPb.PushConfig{
PushEndpoint: sub.PushCfg.Pend,
MaxMessages: sub.PushCfg.MaxMessages,
PushEndpoint: sub.PushCfg.Pend,
AuthorizationHeader: sub.PushCfg.AuthorizationHeader.Value,
MaxMessages: sub.PushCfg.MaxMessages,
RetryPolicy: &amsPb.RetryPolicy{
Period: sub.PushCfg.RetPol.Period,
Type: sub.PushCfg.RetPol.PolicyType,
Expand Down Expand Up @@ -443,9 +444,15 @@ type Subscription struct {

// PushConfig holds optional configuration for push operations
type PushConfig struct {
Pend string `json:"pushEndpoint"`
MaxMessages int64 `json:"maxMessages"`
RetPol RetryPolicy `json:"retryPolicy"`
Pend string `json:"pushEndpoint"`
AuthorizationHeader AuthorizationHeader `json:"authorization_header"`
MaxMessages int64 `json:"maxMessages"`
RetPol RetryPolicy `json:"retryPolicy"`
}

// AuthorizationHeader holds an optional value to be supplied as an Authorization header to push requests
type AuthorizationHeader struct {
Value string `json:"value"`
}

// RetryPolicy holds information on retry policies
Expand Down
17 changes: 13 additions & 4 deletions api/v1/grpc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,14 @@ func (suite *ServerTestSuite) TestGetSubscription() {
Period: 300,
}

authz := AuthorizationHeader{
Value: "auth-header-1",
}

pc := PushConfig{
Pend: "example.com:9999",
RetPol: rp,
Pend: "example.com:9999",
AuthorizationHeader: authz,
RetPol: rp,
}

expectedSub := Subscription{
Expand Down Expand Up @@ -397,10 +402,14 @@ func (m *MockRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
PolicyType: "linear",
Period: 300,
}
authz := AuthorizationHeader{
Value: "auth-header-1",
}

pc := PushConfig{
Pend: "example.com:9999",
RetPol: rp,
Pend: "example.com:9999",
AuthorizationHeader: authz,
RetPol: rp,
}

s := Subscription{
Expand Down
2 changes: 1 addition & 1 deletion push/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type WorkerTestSuite struct {
func (suite *WorkerTestSuite) TestNew() {

c := consumers.NewAmsHttpConsumer("", "", "", &http.Client{})
s := senders.NewHttpSender("", &http.Client{})
s := senders.NewHttpSender("", "", &http.Client{})
sub := &amsPb.Subscription{
PushConfig: &amsPb.PushConfig{
MaxMessages: 1,
Expand Down
11 changes: 8 additions & 3 deletions senders/httpsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@ const ApplicationJson = "application/json"

// HttpSender delivers data to any http endpoint
type HttpSender struct {
client *http.Client
endpoint string
client *http.Client
endpoint string
authZHeader string
}

// NewHttpSender initialises and returns a new http sender
func NewHttpSender(endpoint string, client *http.Client) *HttpSender {
func NewHttpSender(endpoint, authz string, client *http.Client) *HttpSender {
s := new(HttpSender)
s.client = client
s.endpoint = endpoint
s.authZHeader = authz
return s
}

Expand Down Expand Up @@ -50,6 +52,9 @@ func (s *HttpSender) Send(ctx context.Context, msgs PushMsgs, format pushMessage
}

req.Header.Set("Content-Type", ApplicationJson)
if s.authZHeader != "" {
req.Header.Set("Authorization", s.authZHeader)
}

log.WithFields(
log.Fields{
Expand Down
15 changes: 8 additions & 7 deletions senders/httpsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ type HttpSenderTestSuite struct {
// TestNewHttpSender tests the proper initialisation of an http sender
func (suite *HttpSenderTestSuite) TestNewHttpSender() {

s := NewHttpSender("example.com:443", new(http.Client))
s := NewHttpSender("example.com:443", "auth-header-1", new(http.Client))

suite.Equal("example.com:443", s.endpoint)
suite.Equal("auth-header-1", s.authZHeader)
suite.Equal(new(http.Client), s.client)
}

Expand All @@ -33,7 +34,7 @@ func (suite *HttpSenderTestSuite) TestSend() {
}

// test the normal case of 200
s1 := NewHttpSender("https://example.com:8080/receive_here_200", client)
s1 := NewHttpSender("https://example.com:8080/receive_here_200", "auth-header-1", client)
m1 := PushMsg{Sub: "sub"}
m1s := PushMsgs{Messages: []PushMsg{m1}}
e1 := s1.Send(context.Background(), m1s, MultipleMessageFormat)
Expand All @@ -60,22 +61,22 @@ func (suite *HttpSenderTestSuite) TestSend() {
suite.False(ok)

// test the normal case of 201
s2 := NewHttpSender("https://example.com:8080/receive_here_201", client)
s2 := NewHttpSender("https://example.com:8080/receive_here_201", "", client)
e2 := s2.Send(context.Background(), PushMsgs{}, MultipleMessageFormat)
suite.Nil(e2)

// test the normal case of 204
s3 := NewHttpSender("https://example.com:8080/receive_here_204", client)
s3 := NewHttpSender("https://example.com:8080/receive_here_204", "auth-header-1", client)
e3 := s3.Send(context.Background(), PushMsgs{}, MultipleMessageFormat)
suite.Nil(e3)

// test the normal case of 102
s4 := NewHttpSender("https://example.com:8080/receive_here_102", client)
s4 := NewHttpSender("https://example.com:8080/receive_here_102", "auth-header-1", client)
e4 := s4.Send(context.Background(), PushMsgs{}, MultipleMessageFormat)
suite.Nil(e4)

// test the error case
s5 := NewHttpSender("https://example.com:8080/receive_here_error", client)
s5 := NewHttpSender("https://example.com:8080/receive_here_error", "", client)
e5 := s5.Send(context.Background(), PushMsgs{}, MultipleMessageFormat)

expOut := `{
Expand All @@ -90,7 +91,7 @@ func (suite *HttpSenderTestSuite) TestSend() {
}

func (suite *HttpSenderTestSuite) TestDestination() {
s := NewHttpSender("example.com:443", nil)
s := NewHttpSender("example.com:443", "auth-header-1", nil)
suite.Equal("example.com:443", s.Destination())
}

Expand Down
56 changes: 32 additions & 24 deletions senders/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,36 +52,44 @@ func (m *MockSenderRoundTripper) RoundTrip(r *http.Request) (*http.Response, err
switch r.URL.Path {

case "/receive_here_200":
resp = &http.Response{
StatusCode: 200,
// Send response to be tested
Body: ioutil.NopCloser(strings.NewReader("")),
// Must be set to non-nil value or it panics
Header: header,
if r.Header.Get("authorization") == "auth-header-1" {
resp = &http.Response{
StatusCode: 200,
// Send response to be tested
Body: ioutil.NopCloser(strings.NewReader("")),
// Must be set to non-nil value or it panics
Header: header,
}
}
case "/receive_here_201":
resp = &http.Response{
StatusCode: 201,
// Send response to be tested
Body: ioutil.NopCloser(strings.NewReader("")),
// Must be set to non-nil value or it panics
Header: header,
if r.Header.Get("authorization") == "" {
resp = &http.Response{
StatusCode: 201,
// Send response to be tested
Body: ioutil.NopCloser(strings.NewReader("")),
// Must be set to non-nil value or it panics
Header: header,
}
}
case "/receive_here_204":
resp = &http.Response{
StatusCode: 204,
// Send response to be tested
Body: ioutil.NopCloser(strings.NewReader("")),
// Must be set to non-nil value or it panics
Header: header,
if r.Header.Get("authorization") == "auth-header-1" {
resp = &http.Response{
StatusCode: 204,
// Send response to be tested
Body: ioutil.NopCloser(strings.NewReader("")),
// Must be set to non-nil value or it panics
Header: header,
}
}
case "/receive_here_102":
resp = &http.Response{
StatusCode: 102,
// Send response to be tested
Body: ioutil.NopCloser(strings.NewReader("")),
// Must be set to non-nil value or it panics
Header: header,
if r.Header.Get("authorization") == "auth-header-1" {
resp = &http.Response{
StatusCode: 102,
// Send response to be tested
Body: ioutil.NopCloser(strings.NewReader("")),
// Must be set to non-nil value or it panics
Header: header,
}
}
case "/receive_here_error":

Expand Down
5 changes: 3 additions & 2 deletions senders/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package senders
import (
"context"
"fmt"
amsPb "github.com/ARGOeu/ams-push-server/api/v1/grpc/proto"
"github.com/ARGOeu/ams-push-server/consumers"
"net/http"
)
Expand All @@ -26,11 +27,11 @@ type Sender interface {
}

// New acts as a sender factory, creates and returns a new sender based on the provided type
func New(sType senderType, endpoint string, client *http.Client) (Sender, error) {
func New(sType senderType, cfg amsPb.PushConfig, client *http.Client) (Sender, error) {

switch sType {
case HttpSenderType:
return NewHttpSender(endpoint, client), nil
return NewHttpSender(cfg.PushEndpoint, cfg.AuthorizationHeader, client), nil
}

return nil, fmt.Errorf("sender %v not yet implemented", sType)
Expand Down
Loading

0 comments on commit c4f8d11

Please sign in to comment.