Spring Boot WebSocket URL Not Responding and RxJS Call Repetition?

Solution for Spring Boot WebSocket URL Not Responding and RxJS Call Repetition?
is Given Below:

I’m trying to follow a guide to WebSockets at https://www.devglan.com/spring-boot/spring-boot-angular-websocket

I’d like it to respond to ws://localhost:8448/wsb/softlayer-cost-file, but I’m sure I misunderstood something. I’d like to get it to receive a binary file and issue periodic updates as the file is being processed.

Questions are:

  • How come Spring does not respond to my requests despite all the multiple URLs I try (see below).
  • Does my RxJS call run once and then conclude, or does it keep running until some closure has happened? Sorry to ask what might be obvious to others.

On my Spring Boot Server start, I see no errors. After about 5-7 minutes of running, I saw the following log message:

INFO  o.s.w.s.c.WebSocketMessageBrokerStats - WebSocketSession[0 current WS(0)-HttpStream(0)-HttpPoll(0), 0 total, 0 closed abnormally (0 connect failure, 0 send limit, 0 transport error)], stompSubProtocol[processed CONNECT(0)-CONNECTED(0)-DISCONNECT(0)], stompBrokerRelay[null], inboundChannel[pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0], outboundChannel[pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0], sockJsScheduler[pool size = 6, active threads = 1, queued tasks = 0, completed tasks = 5]

I’ve pointed my browser at these URLs and can’t get the Spring Boot server to show any reaction:
ws://localhost:8448/app/message
ws://localhost:8448/greeting/app/message
ws://localhost:8448/topic
ws://localhost:8448/queue
(I got the initial request formed in Firefox, then clicked edit/resend to try again).

WebSocketConfig.java

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
    
    @Autowired
    CostFileUploadWebSocketHandler costFileUploadWebSocketHandler;

    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(new SocketTextHandler(), "/wst");
        registry.addHandler(costFileUploadWebSocketHandler, "/wsb/softlayer-cost-file");
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableSimpleBroker("/topic/", "/queue/");
        config.setApplicationDestinationPrefixes("/app");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/greeting").setAllowedOrigins("*");
        // .withSockJS();
    }

}

CostFileUploadWebSocketHandler.java

@Component
public class CostFileUploadWebSocketHandler extends BinaryWebSocketHandler {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    private SoftLayerJobService softLayerJobService;
    private SoftLayerService softLayerService;
    private AuthenticationFacade authenticationFacade;
    
    @Autowired
    CostFileUploadWebSocketHandler(SoftLayerJobService softLayerJobService, SoftLayerService softLayerService,
            AuthenticationFacade authenticationFacade) {
        this.softLayerJobService = softLayerJobService;
        this.softLayerService = softLayerService;
        this.authenticationFacade = authenticationFacade;
    }

    Map<WebSocketSession, FileUploadInFlight> sessionToFileMap = new WeakHashMap<>();

    @Override
    public boolean supportsPartialMessages() {
        return true;
    }
    
    class WebSocketProgressReporter implements ProgressReporter {
        private WebSocketSession session;

        public WebSocketProgressReporter(WebSocketSession session) {
            this.session = session;
        }

        @Override
        public void reportCurrentProgress(BatchStatus currentBatchStatus, long currentPercentage) {
            try {
                session.sendMessage(new TextMessage("BatchStatus "+currentBatchStatus));
                session.sendMessage(new TextMessage("Percentage Complete "+currentPercentage));
            } catch(IOException e) {
                throw new RuntimeException(e);
            }
        }
        
    }

    @Override
    protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {

        ByteBuffer payload = message.getPayload();
        FileUploadInFlight inflightUpload = sessionToFileMap.get(session);
        if (inflightUpload == null) {
            throw new IllegalStateException("This is not expected");
        }
        inflightUpload.append(payload);

        if (message.isLast()) {
            File fileNameSaved = save(inflightUpload.name, "websocket", inflightUpload.bos.toByteArray());
            BatchStatus currentBatchStatus = BatchStatus.UNKNOWN;
            long percentageComplete;
            
            ProgressReporter progressReporter = new WebSocketProgressReporter(session);
             
            SoftLayerCostFileJobExecutionThread softLayerCostFileJobExecutionThread = 
                    new SoftLayerCostFileJobExecutionThread(softLayerService, softLayerJobService, fileNameSaved,progressReporter);

            logger.info("In main thread about to begin separate thread");
            ForkJoinPool.commonPool().submit(softLayerCostFileJobExecutionThread);
            while(!softLayerCostFileJobExecutionThread.jobDone()); 
//          softLayerCostFileJobExecutionThread.run();
            // Wait for above to complete somehow

//            StepExecution foundStepExecution = jobExplorer.getJobExecution(
//                  jobExecutionThread.getJobExecutionResult().getJobExecution().getId()
//                  ).getStepExecutions().stream().filter(stepExecution->stepExecution.getStepName().equals("softlayerUploadFile")).findFirst().orElseGet(null);

//            if (!"COMPLETED".equals(jobExecutionResult.getExitStatus())) {
//                throw new UploadFileException(file.getOriginalFilename() + " exit status: " + jobExecutionResult.getExitStatus());
//            }
            logger.info("In main thread after separate thread submitted");
            

            session.sendMessage(new TextMessage("UPLOAD "+inflightUpload.name));
            session.close();
            sessionToFileMap.remove(session);
            logger.info("Uploaded "+inflightUpload.name);
        }
        String response = "Upload Chunk: size "+ payload.array().length;
        logger.debug(response);

    }
    
    private File save(String fileName, String prefix, byte[] data) throws IOException {
        Path basePath = Paths.get(".", "uploads", prefix, UUID.randomUUID().toString());
        logger.info("Saving incoming cost file "+fileName+" to "+basePath);
        Files.createDirectories(basePath);

        FileChannel channel =  new FileOutputStream(Paths.get(basePath.toString(), fileName).toFile(), false).getChannel();
        channel.write(ByteBuffer.wrap(data));
        channel.close();
        return new File(basePath.getFileName().toString());
    }
    

    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        sessionToFileMap.put(session, new FileUploadInFlight(session));
    }

    static class FileUploadInFlight {
        private final Logger logger = LoggerFactory.getLogger(this.getClass());
        String name;
        String uniqueUploadId;
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        /**
         * Fragile constructor - beware not prod ready
         * @param session
         */
        FileUploadInFlight(WebSocketSession session) {
            String query = session.getUri().getQuery();
            String uploadSessionIdBase64 = query.split("=")[1];
            String uploadSessionId = new String(Base64Utils.decodeUrlSafe(uploadSessionIdBase64.getBytes()));

            List<String> sessionIdentifiers = Splitter.on("\").splitToList(uploadSessionId);
            String uniqueUploadId = session.getRemoteAddress().toString()+sessionIdentifiers.get(0);
            String fileName = sessionIdentifiers.get(1);
            this.name = fileName;
            this.uniqueUploadId = uniqueUploadId;
            logger.info("Preparing upload for "+this.name+" uploadSessionId "+uploadSessionId);
        }
        public void append(ByteBuffer byteBuffer) throws IOException{
            bos.write(byteBuffer.array());
        }
    }
}

Below is a snippet of Angular code where I make the call to the websocket. The service is intended to receive a file, then provide regular updates of percentage complete until the service is completed. Does this call need to be in a loop, or does the socket run until it’s closed?

Angular Snippet of call to WebSocket:

      this.softlayerService.uploadBlueReportFile(this.blueReportFile)
        .subscribe(data => {
            this.showLoaderBlueReport = false;
            this.successBlueReport = true;
            this.blueReportFileName = "No file selected";
            this.responseBlueReport="File ".concat(data.fileName).concat(' ').concat('is ').concat(data.exitStatus);
            this.blueReportSelected = false;
            this.getCurrentUserFiles();
          },
          (error)=>{
            if(error.status === 504){
              this.showLoaderBlueReport = false;
              this.stillProcessing = true;
            }else{
              this.showLoaderBlueReport = false;
              this.displayUploadBlueReportsError(error, 'File upload failed');
            }
          });
    }