RxJS

参考

create observable

// create
const stream$ = Observable.create(observer => {
  observer.next(1);
  observer.error('error message');
  observer.complete();

  return function() {
    someCleanUp();
  };
});

// producer
class Producer {
  i = 0;
  nextValue() {
    return this.i++;
  }
}

// range
let stream$ = range(1, 3);

// from
let stream$ = from(/* Array, Promise */);

// of
let stream$ = of(1, 2, 3, 4);

observe

// subscribe
const subscription = stream$.subscribe(onSuccess, onError, onComplete);

// unsubscribe
subscription.unsubscribe();

// cold observable => 1,2,3,1,2,3
let stream$ = of(1, 2, 3);
stream$.subscribe(data => console.log(data));
stream$.subscribe(data => console.log(data));

// hot observable => 0,1,2,2,3,3,4,4
let stream$ = interval(1000).pipe(
  take(5),
  publish(), // important
);
stream$.subscribe(
  data => console.log(`subscriber from first minute: ${data}`),
  err => console.log(err),
  () => console.log('completed'),
);
setTimeout(() => {
  stream$.subscribe(
    data => console.log(`subscriber from 2nd minute: ${data}`),
    err => console.log(err),
    () => console.log('completed'),
  );
}, 2100);
stream$.connect(); // important

// warm observable => 0,1,2,3,3,4,4
// (wait until a subscriber comes at least 1)
let stream$ = interval(1000).pipe(
  take(5),
  publish(),
  refCount(), // important
);
setTimeout(() => {
  stream$.subscribe(data => console.log(data));
}, 2000);
setTimeout(() => {
  stream$.subscribe(data => console.log(data));
}, 5100);

Operators

// of
const stream$ = of(1, 2, 3, 4, 5);

// tap
tap(value => console.log(value));

// filter
filter(value => value % 2 == 0);

// creating observable in observable
const stream$ = of(1, 2, 3).pipe(
  // we need to make it 'flat' because 'map' returns array of observable
  flatMap(val =>
    of(val)
      .pipe(ajax({ url: url + val }))
      .pipe(map(e => e.response)),
  ),
);

// fetch
from(fetch('https://jsonplaceholder.typicode.com/posts/1/'))
  .pipe(flatMap(res => from(res.json())))
  .subscribe(console.log);

// to promise
const promise = of(1, 2, 3).toPromise();

combination

// return the array of the latest values  (1,2,3) (1,2) => [3,2]
let stream$ = combineLatest(source1, source2);

// run each source sequentially  (1,2,3) (1,2) => (1,2,3,1,2)
let stream$ = concat(source1, source2);

// merge: run each source at a time  (1,2,3) (1,2) => (1,1,2,2,3)
let stream$ = merge(source1, source2);

// zip: joins values on column basis.  (1,2,3) (1,2) => ([1,1], [2,2])
let stream$ = zip(source1, source2);

math

max();
min();
sum();

time

// reating observer
interval(100); // each 100ms
timer(1000); // after 1sec
timer(5000, 1000); // wait 5sec, then each 1sec

// operator
delay(100); // delay 1sec
debounceTime(500); // wait 0.5s and pass to next. timer and values is throw away if new event comes.

grouping

// buffer (0,1,2,3,4,5,6,7,8,9) => ([0,1,2,3,4],[5,6,7,8,9])
let breakWhen$ = interval(500);
let stream$ = interval(100).pipe(buffer(breakWhen$));
stream$.subscribe(data => console.log('values', data));

// buffertime
bufferTime(500);
// is equal to...
buffer(interval(500));

error handling

// retry
// retry all th stream from the beginning
// (1, 2) => (1,1,1,1,1,1,2(error))
let stream$ = of(1, 2).pipe(
  map(value => {
    if (value > 1) throw 'error';
    return value;
  }),
  retry(5),
);

// retryWhen
retryWhen(stream => stream.delay(200));

// catchError(エラーを正常系に変換して流す)
let error$ = throwError('crash').pipe(catchError(err => of('patched', err)));
error$.subscribe(
  data => console.log(data),
  err => console.error(err),
  () => console.log('complete'),
);
// ('patched', 'crash', 'complete') すべて正常系