How to monitor number of RXJS subscriptions?

You could achieve it using defer to track subscriptions and finalize to track completions, e.g. as an operator:

// a custom operator that will count number of subscribers
function customOperator(onCountUpdate = noop) {
  return function refCountOperatorFunction(source$) {
    let counter = 0;

    return defer(()=>{
      counter++;
      onCountUpdate(counter);
      return source$;
    })
    .pipe(
      finalize(()=>{
        counter--;
        onCountUpdate(counter);
      })
    );
  };
}

// just a stub for `onCountUpdate`
function noop(){}

And then use it like:

const source$ = new Subject();

const result$ = source$.pipe(
  customOperator( n => console.log('Count updated: ', n) )
);

Heres a code snippet illustrating this:

const { Subject, of, timer, pipe, defer } = rxjs;
const { finalize, takeUntil } = rxjs.operators;


const source$ = new Subject();

const result$ = source$.pipe(
  customOperator( n => console.log('Count updated: ', n) )
);

// emit events
setTimeout(()=>{
  source$.next('one');
}, 250);

setTimeout(()=>{
  source$.next('two');
}, 1000);

setTimeout(()=>{
  source$.next('three');
}, 1250);

setTimeout(()=>{
  source$.next('four');
}, 1750);


// subscribe and unsubscribe
const subscriptionA = result$
  .subscribe(value => console.log('A', value));

setTimeout(()=>{
  result$.subscribe(value => console.log('B', value));
}, 500);


setTimeout(()=>{
  result$.subscribe(value => console.log('C', value));
}, 1000);

setTimeout(()=>{
  subscriptionA.unsubscribe();
}, 1500);


// complete source
setTimeout(()=>{
  source$.complete();
}, 2000);


function customOperator(onCountUpdate = noop) {
  return function refCountOperatorFunction(source$) {
    let counter = 0;

    return defer(()=>{
      counter++;
      onCountUpdate(counter);
      return source$;
    })
    .pipe(
      finalize(()=>{
        counter--;
        onCountUpdate(counter);
      })
    );
  };
}

function noop(){}
<script src="https://unpkg.com/[email protected]/bundles/rxjs.umd.min.js"></script>

* NOTE: if your source$ is cold — you might need to share it.

Hope it helps

You are really asking three separate questions here, and I question whether you really need the full capability that you mention. Since most of the resource managment stuff you are asking for is already provided for by the library, doing custom tracking code seems to be redundant. The first two questions:

  • Allocate global resource when the number of subscriptions becomes greater than 0
  • Release global resource when the number of subscriptions becomes 0

Can be done with the using + share operators:

class ExpensiveResource {
  constructor () {
    // Do construction
  }
  unsubscribe () {
   // Do Tear down
  }
}

// Creates a resource and ties its lifecycle with that of the created `Observable`
// generated by the second factory function
// Using will accept anything that is "Subscription-like" meaning it has a unsubscribe function.
const sharedStream$ = using(
  // Creates an expensive resource
  () => new ExpensiveResource(), 
  // Passes that expensive resource to an Observable factory function
  er => timer(1000)
)
// Share the underlying source so that global creation and deletion are only
// processed when the subscriber count changes between 0 and 1 (or visa versa)
.pipe(share())

After that sharedStream$ can be passed around as a base stream which will manage the underlying resource (assuming you implemented your unsubscribe correctly) so that the resource will be created and torn down as the number of subscribers transitions between 0 and 1.

  • Adjust the resource usage strategy based on the number of subscriptions

    The third question I am most dubious on, but I’ll answer it for completeness assuming you know your application better than I do (since I can’t think of a reason why you would need specific handling at different usage levels other than going between 0 and 1).

Basically I would use a similar approach as above but I would encapuslate the transition logic slightly differently.

// Same as above
class ExpensiveResource {
  unsubscribe() {  console.log('Tear down this resource!')}
}

const usingReferenceTracking = 
  (onUp, onDown) => (resourceFactory, streamFactory) => {
    let instance, refCount = 0
    // Again manage the global resource state with using
    const r$ = using(
      // Unfortunately the using pattern doesn't let the resource escape the closure
      // so we need to cache it for ourselves to use later
      () => instance || (instance = resourceFactory()),
      // Forward stream creation as normal
      streamFactory
      )
    ).pipe(
      // Don't forget to clean up the stream after all is said and done
      // Because its behind a share this should only happen when all subscribers unsubscribe
      finalize(() => instance = null)
      share()
    )
    // Use defer to trigger "onSubscribe" side-effects
    // Note as well that these side-effects could be merged with the above for improved performance
    // But I prefer them separate for easier maintenance.
    return defer(() => onUp(instance, refCount += 1) || r$)
      // Use finalize to handle the "onFinish" side-effects
      .pipe(finalize(() => onDown(instance, refCount -= 1)))

}

const referenceTracked$ = usingReferenceTracking(
  (ref, count) => console.log('Ref count increased to ' + count),
  (ref, count) => console.log('Ref count decreased to ' + count)
)(
  () => new ExpensiveResource(),
  ref => timer(1000)
)

referenceTracked$.take(1).subscribe(x => console.log('Sub1 ' +x))
referenceTracked$.take(1).subscribe(x => console.log('Sub2 ' +x))


// Ref count increased to 1
// Ref count increased to 2
// Sub1 0
// Ref count decreased to 1
// Sub2 0
// Ref count decreased to 0
// Tear down this resource!

Warning: One side effect of this is that by definition the stream will be warm once it leaves the usingReferenceTracking function, and it will go hot on first subscription. Make sure you take this into account during the subscription phase.

What a fun problem! If I am understanding what you are asking, here is my solution to this: create a wrapper class around Observable that tracks the subscriptions by intercepting both subscribe() and unsubscribe(). Here is the wrapper class:

export class CountSubsObservable<T> extends Observable<T>{
  private _subCount = 0;
  private _subCount$: BehaviorSubject<number> = new BehaviorSubject(0);
  public subCount$ = this._subCount$.asObservable();

  constructor(public source: Observable<T>) {
    super();
  }

  subscribe(
    observerOrNext?: PartialObserver<T> | ((value: T) => void), 
    error?: (error: any) => void, 
    complete?: () => void 
  ): Subscription {
    this._subCount++;
    this._subCount$.next(this._subCount);
    let subscription = super.subscribe(observerOrNext as any, error, complete);
    const newUnsub: () => void = () => {
      if (this._subCount > 0) {
        this._subCount--;
        this._subCount$.next(this._subCount);
        subscription.unsubscribe();
      }
    }
    subscription.unsubscribe = newUnsub;
    return subscription;
  }
}

This wrapper creates a secondary observable .subCount$ that can be subscribed to which will emit every time the number of subscriptions to the source observable changes. It will emit a number corresponding to the current number of subscribers.

To use it you would create a source observable and then call new with this class to create the wrapper. For example:

const source$ = interval(1000).pipe(take(10));

const myEvent$: CountSubsObservable<number> = new CountSubsObservable(source$);

myEvent$.subCount$.subscribe(numSubs => {
  console.log('subCount$ notification! Number of subscriptions is now', numSubs);
  if(numSubs === 0) {
    // release global resource
  } else {
    // allocate global resource, if not yet allocated
  }
  // for a scalable resource usage / load,
  // re-configure it, based on numSubs
});

source$.subscribe(result => console.log('result is ', result));

To see it in use, check out this Stackblitz.

UPDATE:

Ok, as mentioned in the comments, I’m struggling a little to understand where the stream of data is coming from. Looking back through your question, I see you are providing an “event subscription interface”. If the stream of data is a stream of CustomType as you detail in your third update above, then you may want to use fromEvent() from rxjs to create the source observable with which you would call the wrapper class I provided.

To show this I created a new Stackblitz. From that Stackblitz here is the stream of CustomTypes and how I would use the CountedObservable class to achieve what you are looking for.

class CustomType {
  a: string;
}

const dataArray = [
  { a: 'January' },
  { a: 'February' },
  { a: 'March' },
  { a: 'April' },
  { a: 'May' },
  { a: 'June' },
  { a: 'July' },
  { a: 'August' },
  { a: 'September' },
  { a: 'October' },
  { a: 'November' },
  { a: 'December' }
] as CustomType[];

// Set up an arbitrary source that sends a stream of `CustomTypes`, one
// every two seconds by using `interval` and mapping the numbers into
// the associated dataArray.  
const source$ = interval(2000).pipe(
  map(i => dataArray[i]), // transform the Observable stream into CustomTypes
  take(dataArray.length),  // limit the Observable to only emit # array elements
  share() // turn into a hot Observable.
);

const myEvent$: CountedObservable<CustomType> = new CountedObservable(source$);

myEvent$.onCount.subscribe(newCount => {
  console.log('newCount notification! Number of subscriptions is now', newCount);
});

I hope this helps.