// RxJS v6+import { interval } from'rxjs/observable/interval';import { takeUntil, filter, scan, map, withLatestFrom } from'rxjs/operators';//emit value every 1sconstsource=interval(1000);//is number even?constisEven= val => val %2===0;//only allow values that are evenconstevenSource=source.pipe(filter(isEven));//keep a running total of the number of even numbers outconstevenNumberCount=evenSource.pipe(scan((acc, _) => acc +1,0));//do not emit until 5 even numbers have been emittedconstfiveEvenNumbers=evenNumberCount.pipe(filter(val => val >5));constexample=evenSource.pipe(//also give me the current even number count for displaywithLatestFrom(evenNumberCount),map(([val, count]) =>`Even number (${count}) : ${val}`),//when five even numbers have been emitted, complete source observabletakeUntil(fiveEvenNumbers));/* Even number (1) : 0, Even number (2) : 2 Even number (3) : 4 Even number (4) : 6 Even number (5) : 8*/constsubscribe=example.subscribe(val =>console.log(val));
Example 3: Take mouse events on mouse down until mouse up