RxJS: takeUntil() Angular component's ngOnDestroy() - TagMerge
5RxJS: takeUntil() Angular component's ngOnDestroy()RxJS: takeUntil() Angular component's ngOnDestroy()

RxJS: takeUntil() Angular component's ngOnDestroy()

Asked 7 months ago
89
5 answers

You could leverage a ReplaySubject for that:

EDIT: Different since RxJS 6.x: Note the use of the pipe() method.

class myComponent {
  private destroyed$: ReplaySubject<boolean> = new ReplaySubject(1);

  constructor(
    private serviceA: ServiceA,
    private serviceB: ServiceB,
    private serviceC: ServiceC) {}

  ngOnInit() {
    this.serviceA
      .pipe(takeUntil(this.destroyed$))
      .subscribe(...);
    this.serviceB
      .pipe(takeUntil(this.destroyed$))
      .subscribe(...);
    this.serviceC
      .pipe(takeUntil(this.destroyed$))
      .subscribe(...);
  }

  ngOnDestroy() {
    this.destroyed$.next(true);
    this.destroyed$.complete();
  }
}

This is only valid for RxJS 5.x and older:

class myComponentOld {
  private destroyed$: ReplaySubject<boolean> = new ReplaySubject(1);

  constructor(private serviceA: ServiceA) {}

  ngOnInit() {
    this.serviceA
      .takeUntil(this.destroyed$)
      .subscribe(...);
  }

  ngOnDestroy() {
    this.destroyed$.next(true);
    this.destroyed$.complete();
  }
}

Source: link

23

Using the componentDestroyed() function from the npm package @w11k/ngx-componentdestroyed is by far the easiest way to use takeUntil:

@Component({
  selector: 'foo',
  templateUrl: './foo.component.html'
})
export class FooComponent implements OnInit, OnDestroy {
  ngOnInit() {
    Observable.interval(1000)
      .takeUntil(componentDestroyed(this)) // <--- magic is here!
      .subscribe(console.log);
  }

  ngOnDestroy() {}
}

Here's a version of componentDestroyed() to include directly in your code:

// Based on https://www.npmjs.com/package/ng2-rx-componentdestroyed
import { OnDestroy } from '@angular/core';
import { ReplaySubject } from 'rxjs/ReplaySubject';

export function componentDestroyed(component: OnDestroy) {
  const oldNgOnDestroy = component.ngOnDestroy;
  const destroyed$ = new ReplaySubject<void>(1);
  component.ngOnDestroy = () => {
    oldNgOnDestroy.apply(component);
    destroyed$.next(undefined);
    destroyed$.complete();
  };
  return destroyed$;
}

Source: link

15

Well, this comes down to what you mean by closing a subscription. There're basically two ways to do this:

  1. Using an operator that completes the chain (such as takeWhile()).
  2. Unsubscribe from the source Observable.

It's good to know that these two aren't the same.

When using for example takeWhile() you make the operator send complete notification which is propagated to your observers. So if you define:

...
.subscribe(..., ..., () => doWhatever());

Then when you complete the chain with eg. takeWhile() the doWhatever() function will be called.

For example it could look like this:

const Observable = Rx.Observable;
const Subject = Rx.Subject;

let source = Observable.timer(0, 1000);
let subject = new Subject();

source.takeUntil(subject).subscribe(null, null, () => console.log('complete 1'));
source.takeUntil(subject).subscribe(null, null, () => console.log('complete 2'));
source.takeUntil(subject).subscribe(null, null, () => console.log('complete 3'));

setTimeout(() => {
  subject.next();
}, 3000);

After 3s all the complete callbacks will be called.

On the other hand when you unsubscribe you're saying that you're no longer interested in the items produced by the source Observable. However this doesn't mean the source has to complete. You just don't care any more.

This means that you can collect all Subscriptions from .subscribe(...) calls and unsubscribe all of them at once:

let subscriptions = new Rx.Subscription();
let source = Observable.timer(0, 1000);

subscriptions.add(source.subscribe(null, null, () => console.log('complete 1')));
subscriptions.add(source.subscribe(null, null, () => console.log('complete 2')));
subscriptions.add(source.subscribe(null, null, () => console.log('complete 3')));

setTimeout(() => {
  subscriptions.unsubscribe();
}, 3000);

Now after 3s delay nothing will be printed to console because we unsubscribed and no complete callback was invoked.

So what you want to use is up to you and your use-case. Just be aware that unsubscribing is not the same as completing even though I guess in your situation it doesn't really matter.

Source: link

0

The code is also creating an interval observable that you subscribe to when an onStartInterval method gets called.
import { Component, OnInit, OnDestroy } from '@angular/core';

import { Subscription, interval } from 'rxjs';

import { Apollo } from 'apollo-angular';
import gql from 'graphql-tag';

@Component({ ... })
export class AppComponent implements OnInit, OnDestroy {
  myQuerySubscription: Subscription;
  myIntervalSubscription: Subscription;

  constructor(private apollo: Apollo) {}

  ngOnInit() {
    this.myQuerySubscription = this.apollo.watchQuery<any>({
      query: gql`
        query getAllPosts {
          allPosts {
            title
            description
            publishedAt
          }
        }
      `
    })
    .valueChanges
    .subscribe(({data}) => {
      console.log(data);
    });
  }

  onStartInterval() {
    this.myIntervalSubscription = interval(250).subscribe(value => {
      console.log('Current value:', value);
    });
  }

  ngOnDestroy() {
    this.myQuerySubscription.unsubscribe();

    if (this.myIntervalSubscription) {
      this.myIntervalSubscription.unsubscribe();
    }
  }
}
The following snippet does the exact same thing, but this time the code will unsubscribe declaratively. You will notice that an added benefit is that you no longer need to keep references to our subscriptions anymore.
import { Component, OnInit, OnDestroy } from '@angular/core';

import { Subject, interval } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

import { Apollo } from 'apollo-angular';
import gql from 'graphql-tag';

@Component({ ... })
export class AppComponent implements OnInit, OnDestroy {
  destroy$: Subject<boolean> = new Subject<boolean>();

  constructor(private apollo: Apollo) {}

  ngOnInit() {
    this.apollo.watchQuery<any>({
      query: gql`
        query getAllPosts {
          allPosts {
            title
            description
            publishedAt
          }
        }
      `
    })
    .valueChanges
    .pipe(takeUntil(this.destroy$))
    .subscribe(({data}) => {
      console.log(data);
    });
  }

  onStartInterval() {
    interval(250)
    .pipe(takeUntil(this.destroy$))
    .subscribe(value => {
      console.log('Current value:', value);
    });
  }

  ngOnDestroy() {
    this.destroy$.next(true);
    this.destroy$.unsubscribe();
  }
}

Source: link

0

EDIT: Different since RxJS 6.x: Note the use of the pipe() method.
class myComponent {
  private destroyed$: ReplaySubject<boolean> = new ReplaySubject(1);

  constructor(
    private serviceA: ServiceA,
    private serviceB: ServiceB,
    private serviceC: ServiceC) {}

  ngOnInit() {
    this.serviceA
      .pipe(takeUntil(this.destroyed$))
      .subscribe(...);
    this.serviceB
      .pipe(takeUntil(this.destroyed$))
      .subscribe(...);
    this.serviceC
      .pipe(takeUntil(this.destroyed$))
      .subscribe(...);
  }

  ngOnDestroy() {
    this.destroyed$.next(true);
    this.destroyed$.complete();
  }
}
This is only valid for RxJS 5.x and older:
class myComponentOld {
  private destroyed$: ReplaySubject<boolean> = new ReplaySubject(1);

  constructor(private serviceA: ServiceA) {}

  ngOnInit() {
    this.serviceA
      .takeUntil(this.destroyed$)
      .subscribe(...);
  }

  ngOnDestroy() {
    this.destroyed$.next(true);
    this.destroyed$.complete();
  }
}
Using the componentDestroyed() function from the npm package @w11k/ngx-componentdestroyed is by far the easiest way to use takeUntil:
@Component({
  selector: 'foo',
  templateUrl: './foo.component.html'
})
export class FooComponent implements OnInit, OnDestroy {
  ngOnInit() {
    Observable.interval(1000)
      .takeUntil(componentDestroyed(this)) // <--- magic is here!
      .subscribe(console.log);
  }

  ngOnDestroy() {}
}
Here's a version of componentDestroyed() to include directly in your code:
// Based on https://www.npmjs.com/package/ng2-rx-componentdestroyed
import { OnDestroy } from '@angular/core';
import { ReplaySubject } from 'rxjs/ReplaySubject';

export function componentDestroyed(component: OnDestroy) {
  const oldNgOnDestroy = component.ngOnDestroy;
  const destroyed$ = new ReplaySubject<void>(1);
  component.ngOnDestroy = () => {
    oldNgOnDestroy.apply(component);
    destroyed$.next(undefined);
    destroyed$.complete();
  };
  return destroyed$;
}
When using for example takeWhile() you make the operator send complete notification which is propagated to your observers. So if you define:
...
.subscribe(..., ..., () => doWhatever());

Source: link

Recent Questions on angular

    Programming Languages