Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pre-defined proxy if RequestMeta contains proxyId #187

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ trait HttpPartRequestHandler extends Handler {
AggregateRequestIdHeader -> partRequestInfo.requestMeta.id,
PartRequestIdHeader -> partRequestInfo.partRequestId,
PartIdHeader -> partRequestInfo.partRequest.partId
)
) ++ partRequestInfo.requestMeta.proxyId.map { case s => ProxyIdHeader -> s }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need a case here.

We should also make sure to test that the header is being properly forwarded (and left out if no proxy) in subsequent requests to backends from Octoparts.

Also, since this method is called buildTracingHeaders, we should probably do proxy id header adding in another method or rename this method.

}

/**
Expand Down Expand Up @@ -178,6 +178,7 @@ object HttpPartRequestHandler {
val AggregateRequestIdHeader = "X-OCTOPARTS-PARENT-REQUEST-ID"
val PartRequestIdHeader = "X-OCTOPARTS-REQUEST-ID"
val PartIdHeader = "X-OCTOPARTS-PART-ID"
val ProxyIdHeader = "X-OCTOPARTS-PROXY-ID"

/**
* A regex for matching "${...}" placeholders in strings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,9 @@ import scala.concurrent.ExecutionContext
class PartRequestService(
val repository: ConfigsRepository,
val handlerFactory: HttpHandlerFactory,
implicit val zipkinService: ZipkinServiceLike)(implicit val executionContext: ExecutionContext)
extends PartRequestServiceBase
implicit val zipkinService: ZipkinServiceLike,
val proxyDefinitions: Map[String, String] = Map())(implicit val executionContext: ExecutionContext)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave out the default argument. We don't instantiate one of these often and it makes it easier to spot bugs at compile time.

EDIT: actually, if we make proxy configs something that is retrievable from the repository this is unnecessary.

extends PartRequestServiceBase {

override def proxyDefinition(proxyId: String): Option[String] = proxyDefinitions.get(proxyId)
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ trait PartRequestServiceBase extends RequestParamSupport {

def handlerFactory: HttpHandlerFactory

def proxyDefinition(proxyId: String): Option[String] = None

/**
* The primary method responsible for processing a PartRequest into a Future[PartResponse]
*
Expand Down Expand Up @@ -84,7 +86,9 @@ trait PartRequestServiceBase extends RequestParamSupport {
* @return Future[PartResponse], which includes adding deprecation notices
*/
protected def processWithConfig(ci: HttpPartConfig, partRequestInfo: PartRequestInfo, params: Map[ShortPartParam, Seq[String]])(implicit parentSpan: Span): Future[PartResponse] = {
val handler = handlerFactory.makeHandler(ci)
val proxyOverride = partRequestInfo.requestMeta.proxyId.flatMap { case id => proxyDefinition(id) }
val proxyOverriddenConfig = ci.copy(httpProxy = proxyOverride orElse ci.httpProxy)
val handler = handlerFactory.makeHandler(proxyOverriddenConfig)
val fResp = handler.process(partRequestInfo, params)
fResp.map {
resp =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ trait RequestParamSupport {
"meta.serviceId" -> meta.serviceId,
"meta.sessionId" -> meta.sessionId,
"meta.userAgent" -> meta.userAgent,
"meta.userId" -> meta.userId
"meta.userId" -> meta.userId,
"meta.proxyId" -> meta.proxyId
)
value <- mbValue
} yield {
Expand Down
14 changes: 13 additions & 1 deletion app/com/m3/octoparts/wiring/AggregatorServicesModule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ trait AggregatorServicesModule extends RepositoriesModule with AggregatorHandler

private implicit lazy val ec = partsServiceContext

import scala.collection.JavaConverters._

lazy val partRequestServiceBase = new PartRequestService(
configsRepository,
httpHandlerFactory,
zipkinService
zipkinService,
proxyConfig
) with PartResponseCachingSupport with PartResponseLocalContentSupport {
val cacheOps = module.cacheOps
}
Expand All @@ -24,4 +27,13 @@ trait AggregatorServicesModule extends RepositoriesModule with AggregatorHandler
)
}

private[this] lazy val proxyConfig: Map[String, String] = {
val config = configuration.getConfigSeq("proxies")
val list = for {
proxy <- config.getOrElse(Seq())
id <- proxy.getString("id")
url <- proxy.getString("url")
} yield (id -> url)
Map(list: _*)
}
}
2 changes: 1 addition & 1 deletion app/views/part/test.scala.html
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
@(part: presentation.HttpPartConfigView)(implicit flash: Flash, navbarLinks: presentation.NavbarLinks, messages: Messages)

@metaProps = @{Seq("serviceId", "userId", "sessionId", "requestUrl", "userAgent")}
@metaProps = @{Seq("serviceId", "userId", "sessionId", "requestUrl", "userAgent", "proxyId")}

@requiredCls(paramView: presentation.ParamView)= {if (required) "required" else "optional"}

Expand Down
6 changes: 6 additions & 0 deletions conf/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,10 @@ zipkin {
sampleRate = ${?OCTOPARTS_ZIPKIN_SAMPLE_RATE}
}

# Proxy Override
# The mapping from meta.proxyId (taken from X-OCTOPARTS-PROXY-ID HTTP header) to proxy URL
proxies = [
# { id: "proxy1", url: "http://proxy1:8080" }
]

play.modules.disabled += "play.api.db.DBModule"
6 changes: 6 additions & 0 deletions conf/application.dev.conf.sample
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,10 @@ caching {
inmemory = true
}

proxies = [
# The mapping from meta.proxyId (taken from X-OCTOPARTS-PROXY-ID HTTP header) to proxy URL
{ id: "proxy1", url: "http://proxy1:8080" },
{ id: "proxy2", url: "http://proxy2:8080" },
]

swagger.api.basepath="http://localhost:9000"
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,19 @@ class OctopartsApiBuilder(@Nonnull apiRootUrl: String, @Nullable serviceId: Stri
* @param userAgent optional user agent
* @param requestUrl optional request URL
* @param timeoutMs This value is enforced in the octoparts server.
* @param proxyId proxy Id defined in Octoparts' configuration, taken from HTTP header 'X-OCTOPARTS-PROXY-ID'
*/
def newRequest(@Nullable userId: String, @Nullable sessionId: String, @Nullable userAgent: String, @Nullable requestUrl: String, @Nullable timeoutMs: java.lang.Long): RequestBuilder = {
def newRequest(@Nullable userId: String, @Nullable sessionId: String, @Nullable userAgent: String, @Nullable requestUrl: String, @Nullable timeoutMs: java.lang.Long, @Nullable proxyId: String): RequestBuilder = {
val timeoutOpt = if (timeoutMs == null) None else Some(timeoutMs.longValue().millis)
val requestMeta = RequestMeta(UUID.randomUUID.toString, Option(serviceId), Option(userId), Option(sessionId), Option(requestUrl), Option(userAgent), timeoutOpt)
val requestMeta = RequestMeta(UUID.randomUUID.toString, Option(serviceId), Option(userId), Option(sessionId), Option(requestUrl), Option(userAgent), timeoutOpt, Option(proxyId))
new RequestBuilder(requestMeta)
}

/** for backward compatibility */
def newRequest(@Nullable userId: String, @Nullable sessionId: String, @Nullable userAgent: String, @Nullable requestUrl: String, @Nullable timeoutMs: java.lang.Long): RequestBuilder = {
newRequest(userId, sessionId, userAgent, requestUrl, timeoutMs, null);
}

@varargs private[client] def toHttp(aggregateRequest: AggregateRequest, additionalHeaders: (String, String)*) = {
Log.debug(s"${aggregateRequest.requests.size} octoparts")
val jsonContent = Mapper.writeValueAsBytes(aggregateRequest)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ class OctopartsApiBuilderTest extends FunSpec with BeforeAndAfterAll with Matche
JListWrapper(ningrequest.getHeaders.get("a")).toSeq shouldBe Seq("b")
}

it("should handle proxy-id meta") {
val request = apiBuilder.newRequest("123", "cafebabe", null, "/index.jsp", 456L, "test1")
request.build.requestMeta.proxyId should be(Some("test1"))
}

it("should escape var arguments, handling nulls") {
OctopartsApiBuilder.formatWithUriEscape("%s", " ") should be("%20")
OctopartsApiBuilder.formatWithUriEscape("%s%s", " ", " ") should be("%20%20")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ case class RequestMeta(
@(ApiModelProperty @field)(required = false, dataType = "string")@BeanProperty sessionId: Option[String] = None,
@(ApiModelProperty @field)(required = false, dataType = "string")@BeanProperty requestUrl: Option[String] = None,
@(ApiModelProperty @field)(required = false, dataType = "string")@BeanProperty userAgent: Option[String] = None,
@(ApiModelProperty @field)(required = false, dataType = "integer", value = "in ms")@BeanProperty timeout: Option[FiniteDuration] = None)
@(ApiModelProperty @field)(required = false, dataType = "integer", value = "in ms")@BeanProperty timeout: Option[FiniteDuration] = None,
@(ApiModelProperty @field)(required = false, dataType = "String")@BeanProperty proxyId: Option[String] = None)

/**
* A request for a given part. One of more of these can be combined into a single AggregateRequest.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package com.m3.octoparts.aggregator.service

import java.util.concurrent.TimeUnit

import com.beachape.logging.LTSVLogger
import com.beachape.zipkin.services.NoopZipkinService
import com.m3.octoparts.aggregator.PartRequestInfo
import com.m3.octoparts.model._
import com.m3.octoparts.model.config.{ HttpPartConfig, PartParam }
import com.m3.octoparts.model.config.{ Charset, HttpPartConfig, PartParam }
import com.m3.octoparts.repository.ConfigsRepository
import com.m3.octoparts.support.mocks.HandlerMocks
import com.twitter.zipkin.gen.Span
import org.joda.time.{ DateTime }
import org.mockito.Matchers.{ eq => mockitoEq, _ }
import org.mockito.Mockito._
import org.scalatest._
Expand All @@ -15,6 +19,7 @@ import org.scalatest.mock.MockitoSugar

import scala.collection.SortedSet
import scala.concurrent.Future
import scala.concurrent.duration.Duration

class PartRequestServiceSpec
extends FunSpec
Expand All @@ -26,16 +31,63 @@ class PartRequestServiceSpec

implicit val emptySpan = new Span()
val repository = mock[ConfigsRepository]
val config = mock[HttpPartConfig]
doReturn(SortedSet.empty[PartParam]).when(config).parameters
doReturn(None).when(config).deprecatedInFavourOf
doReturn("123").when(config).partId
val config = HttpPartConfig(
Some(1), "123", "owner", Some("description"), "uri", HttpMethod.Get,
SortedSet.empty, 1, Duration(1, TimeUnit.SECONDS), Duration(1, TimeUnit.SECONDS), Charset.forName("UTF-8"), None, SortedSet.empty,
None, None, SortedSet.empty, Some(Duration.Zero), false,
None, None, Duration(1, TimeUnit.SECONDS), None, false, None, DateTime.now, DateTime.now)

val proxyDef = Map("test1" -> "http://mrkuntest1:8888", "test2" -> "http://mrkuntest2:8888")

def pReq(partId: String) = PartRequestInfo(RequestMeta("hi"), PartRequest(partId = partId, id = Some("myId")))
def pReqWithProxy(partId: String, proxy: Option[String]) = PartRequestInfo(RequestMeta("hi", proxyId = proxy), PartRequest(partId = partId, id = Some("myId")))

describe("#responseFor") {

describe("when given a proxy Id") {
val service = new PartRequestService(repository, mockVoidHttpProxyHandlerFactory, NoopZipkinService, proxyDef)
val proxyConfig = config.copy(httpProxy = Some("default"))

it("should not use proxy when no proxy-id is set in both of requestMeta and partConfig") {
doReturn(Future.successful(Some(config))).when(repository).findConfigByPartId(mockitoEq("123"))(anyObject[Span])
whenReady(service.responseFor(pReq("123"))) { r =>
r.contents should be(Some("it worked with proxy None"))
}
}

it("should not override proxy when no proxy-id is set in requestMeta") {
val proxyConfig = config.copy(httpProxy = Some("default"))
doReturn(Future.successful(Some(proxyConfig))).when(repository).findConfigByPartId(mockitoEq("123"))(anyObject[Span])
whenReady(service.responseFor(pReq("123"))) { r =>
r.contents should be(Some("it worked with proxy Some(default)"))
}
}

it("should return a Future[PartResponse] that is filled by proxy response") {
doReturn(Future.successful(Some(config))).when(repository).findConfigByPartId(mockitoEq("123"))(anyObject[Span])
whenReady(service.responseFor(pReqWithProxy("123", Some("test1")))) { r =>
r.contents should be(Some("it worked with proxy Some(http://mrkuntest1:8888)"))
}
}

it("should override proxy setting in partconfig") {
doReturn(Future.successful(Some(proxyConfig))).when(repository).findConfigByPartId(mockitoEq("123"))(anyObject[Span])
whenReady(service.responseFor(pReqWithProxy("123", Some("test1")))) { r =>
r.contents should be(Some("it worked with proxy Some(http://mrkuntest1:8888)"))
}
}

it("should not override proxy when proxy-id is not found in configuration") {
doReturn(Future.successful(Some(config))).when(repository).findConfigByPartId(mockitoEq("123"))(anyObject[Span])
val pReqWithProxy = pReq("123").copy(requestMeta = RequestMeta("hi", proxyId = Some("unknown")))
whenReady(service.responseFor(pReqWithProxy)) { r =>
r.contents should be(Some("it worked with proxy None"))
}
}
}

describe("when given a PartRequest with a partId that is not supported") {
LTSVLogger.info("warming up logger... without this, 'testOnly' for this spec will fail by timeout of Future")
it("should return a Future[PartResponse] with an error that mentions that the part Id is not supported") {
val service = new PartRequestService(repository, mockVoidHttpHandlerFactory, NoopZipkinService)
doReturn(Future.successful(None)).when(repository).findConfigByPartId(anyObject[String]())(anyObject[Span])
Expand All @@ -59,8 +111,8 @@ class PartRequestServiceSpec
}
it("should return a Future[PartResponse] when given a part id for a config that has deprecatedInFavourOf filled in") {
val service = new PartRequestService(repository, mockVoidHttpHandlerFactory, NoopZipkinService)
doReturn(Some("helloWorldPart")).when(config).deprecatedInFavourOf
doReturn(Future.successful(Some(config))).when(repository).findConfigByPartId(mockitoEq("123"))(anyObject[Span])
val deprecatedConfig = config.copy(deprecatedInFavourOf = Some("helloWorldPart"))
doReturn(Future.successful(Some(deprecatedConfig))).when(repository).findConfigByPartId(mockitoEq("123"))(anyObject[Span])
whenReady(service.responseFor(pReq("123"))) { r =>
r.warnings should be(Seq(service.deprecationMsg("123", "helloWorldPart")))
r.contents should be(Some("it worked"))
Expand All @@ -82,15 +134,14 @@ class PartRequestServiceSpec
it("should return a Future with a PartResponse that contains that errors sequence") {
val service = new PartRequestService(repository, mockPartResponseWithErrorHttpHandlerFactory, NoopZipkinService)
doReturn(Future.successful(Some(config))).when(repository).findConfigByPartId(mockitoEq("123"))(anyObject[Span])
doReturn(None).when(config).deprecatedInFavourOf
whenReady(service.responseFor(pReq("123"))) { r =>
r.errors should be(Seq("SomeException"))
}
}
it("should return a Future with a PartResponse that contains that errors sequence and a deprecation warning if the dependency is deprecated") {
val service = new PartRequestService(repository, mockPartResponseWithErrorHttpHandlerFactory, NoopZipkinService)
doReturn(Future.successful(Some(config))).when(repository).findConfigByPartId(mockitoEq("123"))(anyObject[Span])
doReturn(Some("HelloWorld")).when(config).deprecatedInFavourOf
val deprecatedConfig = config.copy(deprecatedInFavourOf = Some("HelloWorld"))
doReturn(Future.successful(Some(deprecatedConfig))).when(repository).findConfigByPartId(mockitoEq("123"))(anyObject[Span])
whenReady(service.responseFor(pReq("123"))) { r =>
r.errors should be(Seq("SomeException"))
r.warnings should be(Seq(service.deprecationMsg("123", "HelloWorld")))
Expand Down
8 changes: 8 additions & 0 deletions test/com/m3/octoparts/support/mocks/HandlerMocks.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ trait HandlerMocks {
val partId = "voidHandler"
def process(pri: PartRequestInfo, args: HandlerArguments)(implicit span: Span) = Future.successful(PartResponse(partId, partId, contents = Some("it worked")))
}
def mockVoidProxyHandler(proxy: Option[String]) = new Handler {
val partId = "voidProxyHandler"
def process(pri: PartRequestInfo, args: HandlerArguments)(implicit span: Span) = Future.successful(PartResponse(partId, partId, contents = Some(s"it worked with proxy ${proxy}")))
}
val mockErrorHandler = new Handler {
val partId = "errorHandler"
def process(pri: PartRequestInfo, args: HandlerArguments)(implicit span: Span) = Future.failed(new RuntimeException)
Expand All @@ -29,6 +33,10 @@ trait HandlerMocks {
implicit val zipkinService = NoopZipkinService
override def makeHandler(ci: HttpPartConfig) = mockVoidHandler
}
val mockVoidHttpProxyHandlerFactory = new HttpHandlerFactory {
implicit val zipkinService = NoopZipkinService
override def makeHandler(ci: HttpPartConfig) = mockVoidProxyHandler(ci.httpProxy)
}
val mockErrorHttpHandlerFactory = new HttpHandlerFactory {
implicit val zipkinService = NoopZipkinService
override def makeHandler(ci: HttpPartConfig) = mockErrorHandler
Expand Down