Sqs.IO in Apache Beam and Session Credentials

Solution for Sqs.IO in Apache Beam and Session Credentials
is Given Below:

I would like to access AWS SQS with short lived credentials from an Apache Beam Pipleline.

In AWS IAM I have created a role with the following trust relationship:

{
  "Effect": "Allow",
  "Principal": {
    "AWS": "arn:aws:sts::xxxxxx:assumed-role/gcp_role/gcp-project-session-name",
    "Service": "sqs.amazonaws.com"
  },
  "Action": "sts:AssumeRole"
},

With this role I am able to access SQS from my local machine.
I used AWS BasicSessionCredentials as followed:

  BasicSessionCredentials refreshedAWSCredentials = new BasicSessionCredentials(
            refreshedCredentials.getAccessKeyId(),
            refreshedCredentials.getSecretAccessKey(),
            refreshedCredentials.getSessionToken());

    AWSSecurityTokenService service = AWSSecurityTokenServiceClientBuilder.standard()
            .withCredentials(new AWSStaticCredentialsProvider(refreshedAWSCredentials))
            .withRegion(options.getAwsRegion()).build();

I add the credentials object to the pipeline options:

     options.setAwsSessionToken(refreshedAWSCredentials.getSessionToken());
     options.setAwsCredentialsProvider(new AWSStaticCredentialsProvider(refreshedAWSCredentials));      
    return Pipeline.create(options); 

At the end I always run into the following error:

Caused by: org.apache.beam.sdk.util.UserCodeException: com.amazonaws.services.sqs.model.AmazonSQSException:
The security token included in the request is invalid. (Service: AmazonSQS; Status Code: 403; Error Code:
InvalidClientTokenId; Request ID: 501e9869-ea58-5e80-9ec1-c1exxxx; Proxy: null

I assume that the AWSStaticCredentialsProvider does not know about the AWS_SECRET_TOKEN.
That’s why I setup a STSAssumeRoleSessionCredentialsProvider which should be work with temporary credentials

STSAssumeRoleSessionCredentialsProvider stsSessionProvider = new STSAssumeRoleSessionCredentialsProvider
            .Builder(awsRoleArn, awsRoleSession)
            .withStsClient(service)
            .build();

This is the associated pipeline code

p.apply(SqsIO.read().withQueueUrl(options.getSourceQueueUrl())
            .withMaxNumRecords(options.getNumberOfRecords()))
            .apply(ParDo.of(new SqsMessageToJson()))
            .apply(TextIO.write()
                    .to(options.getDestinationBucketUrl() + "/purchase_intent/")
                    .withSuffix(".json"));

Even if I used the above provider which worked locally as well, I got the sam exception shown above. So, I am wondering how to setup SqsIO with temp credentials.
Any help would be very appreciated,

best wishes Joern