Skip to content

Commit

Permalink
add bench
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Jan 19, 2024
1 parent 703999e commit edeadd6
Showing 1 changed file with 127 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*
* Copyright (C) 2014-2022 Lightbend Inc. <https://www.lightbend.com>
*/

package org.apache.pekko.stream

import com.typesafe.config.ConfigFactory
import org.apache.pekko
import org.apache.pekko.stream.ActorAttributes.SupervisionStrategy
import org.apache.pekko.stream.Attributes.SourceLocation
import org.apache.pekko.stream.impl.Stages.DefaultAttributes
import org.apache.pekko.stream.impl.fusing.Collect
import org.apache.pekko.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import org.openjdk.jmh.annotations._
import pekko.actor.ActorSystem
import pekko.stream.scaladsl._

import java.util.concurrent.TimeUnit
import scala.annotation.nowarn
import scala.concurrent._
import scala.concurrent.duration._
import scala.util.control.NonFatal

object CollectBenchmark {
final val OperationsPerInvocation = 10000000
}

@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.SECONDS)
@BenchmarkMode(Array(Mode.Throughput))
@nowarn("msg=deprecated")
class CollectBenchmark {
import CollectBenchmark._

private val config = ConfigFactory.parseString("""
akka.actor.default-dispatcher {
executor = "fork-join-executor"
fork-join-executor {
parallelism-factor = 1
}
}
""")

private implicit val system: ActorSystem = ActorSystem("CollectBenchmark", config)

@TearDown
def shutdown(): Unit = {
Await.result(system.terminate(), 5.seconds)
}

private val newCollect = Source
.repeat(1)
.via(new Collect({ case elem => elem }))
.take(OperationsPerInvocation)
.toMat(Sink.ignore)(Keep.right)

private val oldCollect = Source
.repeat(1)
.via(new SimpleCollect({ case elem => elem }))
.take(OperationsPerInvocation)
.toMat(Sink.ignore)(Keep.right)

private class SimpleCollect[In, Out](pf: PartialFunction[In, Out])
extends GraphStage[FlowShape[In, Out]] {
val in = Inlet[In]("SimpleCollect.in")
val out = Outlet[Out]("SimpleCollect.out")
override val shape = FlowShape(in, out)

override def initialAttributes: Attributes = DefaultAttributes.collect and SourceLocation.forLambda(pf)

def createLogic(inheritedAttributes: Attributes) =
new GraphStageLogic(shape) with InHandler with OutHandler {
private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
import Collect.NotApplied

override def onPush(): Unit =
try {
pf.applyOrElse(grab(in), NotApplied) match {
case NotApplied => pull(in)
case result: Out @unchecked => push(out, result)
case _ => throw new RuntimeException()
}
} catch {
case NonFatal(ex) =>
decider(ex) match {
case Supervision.Stop => failStage(ex)
case Supervision.Resume => if (!hasBeenPulled(in)) pull(in)
case Supervision.Restart => if (!hasBeenPulled(in)) pull(in)
}
}

override def onPull(): Unit = pull(in)

setHandlers(in, out, this)
}

override def toString = "SimpleCollect"
}

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def benchOldCollect(): Unit =
Await.result(oldCollect.run(), Duration.Inf)

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def benchNewCollect(): Unit =
Await.result(newCollect.run(), Duration.Inf)

}

0 comments on commit edeadd6

Please sign in to comment.