Consuming Server Sent Events(SSE) without losing data using scala and Akka

Solution for Consuming Server Sent Events(SSE) without losing data using scala and Akka
is Given Below:

I want to consume SSE events without losing any data when the rate of production is > rate of consumption. Since SSE supports backpressure Akka should be able to do it. I tried a few different ways but the extra messages are being dropped.


@Singleton
class SseConsumer @Inject()()(implicit ec: ExecutionContext) {

  implicit val system = ActorSystem()

  val send: HttpRequest => Future[HttpResponse] = foo

  def foo(x: HttpRequest) = {
    try {
      val authHeader = Authorization(BasicHttpCredentials("user", "pass"))
      val newHeaders = x.withHeaders(authHeader)
      Http().singleRequest(newHeaders)
    } catch {
      case e: Exception => {
        println("Exceptio12n", e.printStackTrace())
        throw e
      }
    }
  }


  val eventSource2: Source[ServerSentEvent, NotUsed] =
    EventSource(
      uri = Uri("https://xyz/a/events/user"),
      send,
      initialLastEventId = Some("2"),
      retryDelay = 1.second
    )


  def orderStatusEventStable() = {
    val events: Future[immutable.Seq[ServerSentEvent]] =
      eventSource2
        .throttle(elements = 1, per = 3000.milliseconds, maximumBurst = 1, ThrottleMode.Shaping)
        .take(5)
        .runWith(Sink.seq)
    events.map(_.foreach(x => {
      // TODO: push to sqs
      println("456")
      println(x.data)
    }))
  }


  Future {
    blocking {
      while (true) {
        try {
          Await.result(orderStatusEventStable() recover {
            case e: Exception => {
              println("exception", e)
              throw e
            }
          }, Duration.Inf)
        } catch {
          case e: Exception => {
            println("Exception", e.printStackTrace())
          }
        }
      }
    }
  }
}

This code works but with the following problems:

  1. Due to .take(5) when rate of consumption < rate of production, I am dropping events.
  2. Also I want to process each message as it comes, and don’t want to wait until 5 messages have reached. How can I do that ?
  3. I have to write the consumer in a while loop. This does not seem event based, rather looks like polling (very similar to calling GET with pagination and limit of 5)
  4. I am not sure about throttling, tried reading the docs but its very confusing. If I don’t want to lose any events, is throttling the right approach? I am expecting a rate of 5000 req / sec in peak hours and 10 req/sec otherwise. When the production rate is high I would I ideally want to apply backpressure. Is throttling the correct approach for that ? According to docs it seems correct as it says Backpressures when downstream backpressures or the incoming rate is higher than the speed limit

In order for Akka Stream back pressuring to work, you have to use only one source instead of recreating a kind of polling with a new source each time.

Forget about your loop and your def orderStatusEventStable.

Only do something like this (once):

eventSource2
  .operator(event => /* do something */ )
  .otherOperatorMaybe()
  ...
  .runWith(Sink.foreach(println))

With operator and otherOperatorMaybe being operations on Akka Stream depending on what you want to achieve (like throttle and take in your original code).

List of operators: https://doc.akka.io/docs/akka/current/stream/operators/index.html

Akka Stream is powerful but you need to take some time to learn about it