Consuming Server Sent Events(SSE) in scala play framework with automatic reconnect

Solution for Consuming Server Sent Events(SSE) in scala play framework with automatic reconnect
is Given Below:

How can we consume SSE in scala play framework? Most of the resources that I could find were to make an SSE source. I want to reliably listen to SSE events from other services (with autoconnect). The most relevant article was https://doc.akka.io/docs/alpakka/current/sse.html . I implemented this but this does not seem to work (code below). Also the event that I am su

@Singleton
class SseConsumer @Inject()((implicit ec: ExecutionContext) {

   implicit val system = ActorSystem()

  val send: HttpRequest => Future[HttpResponse] =  foo
  def foo(x:HttpRequest) = {
    try {
      println("foo")
      val authHeader = Authorization(BasicHttpCredentials("user", "pass"))
      val newHeaders = x.withHeaders(authHeader)
      Http().singleRequest(newHeaders)

    }catch {
      case e:Exception => {
        println("Exception", e.printStackTrace())
        throw e
      }
    }
  }

  
  val eventSource: Source[ServerSentEvent, NotUsed] =
    EventSource(
      uri = Uri("https://abc/v1/events"),
      send,
      initialLastEventId = Some("2"),
      retryDelay = 1.second
    )


  def orderStatusEventStable() = {
    val events: Future[immutable.Seq[ServerSentEvent]] =
      eventSource
        .throttle(elements = 1, per = 500.milliseconds, maximumBurst = 1, ThrottleMode.Shaping)
        .take(10)
        .runWith(Sink.seq)
    events.map(_.foreach( x => {
      println("456")
      println(x.data)
    }))
  }



  Future {
    blocking{
      while(true){
        try{
          Thread.sleep(2000)
          orderStatusEventStable()
        } catch {
          case e:Exception => {
            println("Exception", e.printStackTrace())
          }
        }
      }
    }
  }



}

This does not give any exceptions and println(“456”) is never printed.

EDIT:

Future {
    blocking {
      while(true){
        try{
          Await.result(orderStatusEventStable() recover {
            case e: Exception => {
              println("exception", e)
              throw e
            }
          }, Duration.Inf)
        } catch {
          case e:Exception => {
            println("Exception", e.printStackTrace())
          }
        }
      }
    }
  }

Added an await and it started working. Able to read 10 messages at a time. But now I am faced with another problem.
I have a producer which can at times produce faster than I can consume and with this code I have 2 issues:

  1. I have to wait until 10 messages are available. How can we take a max. of 10 and a min. of 0 messages?
  2. When the production rate > consumption rate, I am missing few events. I am guessing this is due to throttling. How do we handle it using backpressure?

The issue in your code is that the events: Future would only complete when the stream (eventSource) completes.

I’m not familiar with SSE but the stream likely never completes in your case as it’s always listening for new events.

You can learn more in Akka Stream documentation.

Depending on what you want to do with the events, you could just map on the stream like:

eventSource
  ...
  .map(/* do something */)
  .runWith(...)

Basically, you need to work with the Akka Stream Source as data is going through it but don’t wait for its completion.

EDIT: I didn’t notice the take(10), my answer applies only if the take was not here. Your code should work after 10 events sent.