How to create a toggle stream in RxJS?

By : RookieProgrammer
Date : November 22 2020, 09:00 AM
I wish this help you Found the solution was to move the distinctUntilChanged to the merged observable and filter by event type and keycode. This would ensure that the subscribers are notified once for the keydown event and once for the keyup event for the same key.
code :
var filterByTypeAndKeyCode = function(e) { return e.type + e.which; }
var target = document.querySelector('body');
var keyUpStream = Rx.Observable.fromEvent(target, 'keyup');
var keyDownStream = Rx.Observable.fromEvent(target, 'keydown');

Rx.Observable.merge(keyDownStream, keyUpStream)
    .subscribe(function(e) {

RXJS : Idiomatic way to create an observable stream from a paged interface

By : Jinath
Date : March 29 2020, 07:55 AM
I wish this help you That's a reasonable way to do it. It has a couple of potential flaws in it (that may or may not impact you depending upon your use case):
You provide no way to observe any errors that occur in this.connect(start) Your observable is effectively hot. If the caller does not immediately subscribe to the observable (perhaps they store it and subscribe later), then they'll miss the completion of this.connect(start) and the observable will appear to never produce anything. You provide no way to unsubscribe from the initial connect call if the caller changes its mind and unsubscribes early. Not a real big deal, but usually when one constructs an observable, one should try to chain the disposables together so it call cleans up properly if the caller unsubscribes.
code :
function records(start = 'LATEST', limit = 1000) {
    return Rx.Observable.create(observer => {
        let pages = new Rx.Subject();
        let connectSub = new Rx.SingleAssignmentDisposable();
        let resultsSub = new Rx.SingleAssignmentDisposable();
        let sub = new Rx.CompositeDisposable(connectSub, resultsSub);

        // Make sure we subscribe to pages before we issue this.connect()
        // just in case this.connect() finishes synchronously (possible if it caches values or something?)
        let results = pages
            .flatMap(page => this.read(page, limit))
            .doOnNext(r => this.next !== undefined ? pages.onNext(this.next) : pages.onCompleted())
            .flatMap(r => r.data);

        // now query the first page
            .subscribe(p => pages.onNext(p), e => observer.onError(e)));

        return sub;
Create a toggle button with RxJS

By : saleh
Date : March 29 2020, 07:55 AM
I think the issue was by ths following , Does your button carry state only through the DOM? In that case it will be impossible to retrieve that state without querying the DOM.
You could consider having your state out of the DOM and manipulate it without DOM functions. That said, you will have at some point to update the DOM state, but that you can do out of the stream in the observer.
code :
var button_initial_state = false; // false for unchecked

var button_state$ = Rx.Observable.fromEvent(btn, 'click')
                                 .scan (function(current_state, event){
                                    return !current_state;
                                  }, button_initial_state)

button_state$.subscribe (function (state){
  // Code to set the button state in the DOM
Angular & RxJS - Create Stream using value from another Stream

By : unityikpe
Date : March 29 2020, 07:55 AM
With these it helps I guess you want to call the http service any time the page number changes, i.e. any time pageToLoad$ emits.
If this is the case, then you may want to try something like
code :
.switchMap(pageNumber => loadPageOfEmpresas(pageNumber))
rxjs switchMap cache the obsolete result and do not create new stream

By : user2405441
Date : March 29 2020, 07:55 AM
I wish did fix the issue. One problem is that you actually execute Math.random and fetch while setting up your test case.
code :
// calling Math.random() => using the return value
const s1$ = of(Math.random())

// calling fetch => using the return value (a promise)
const s3$ = from(fetch(`https://api.github.com/users?per_page=5`))
const { of, defer, fromEvent } = rxjs;
const { ajax }                 = rxjs.ajax;
const { switchMap }            = rxjs.operators;

// defer Math.random()
const s1$ = defer(() => of(Math.random()));

// no defer needed here (already a stream)
const s2$ = ajax.getJSON('https://api.github.com/users?per_page=5');

// defer `fetch`, but `from` is not needed, as a promise is sufficient
const s3$ = defer(() => fetch('https://api.github.com/users?per_page=5'));

const t1$ = fromEvent(document.getElementById('s1'), 'click').pipe(switchMap(() => s1$));
const t2$ = fromEvent(document.getElementById('s2'), 'click').pipe(switchMap(() => s2$));
const t3$ = fromEvent(document.getElementById('s3'), 'click').pipe(switchMap(() => s3$));

<script src="https://unpkg.com/@reactivex/rxjs@6/dist/global/rxjs.umd.js"></script>

<button id="s1">test random</button>
<button id="s2">test ajax</button>
<button id="s3">test fetch</button>
Toggle Rxjs stream from other observables

By : user3336467
Date : March 29 2020, 07:55 AM
should help you out observable.subscribe() returns a Subscription object which you can use to unsubscribe.
code :
let subscription;

_videoInfo$.subscribe((res): void => {
  // when res emits it means a new video is starting
  subscription = _data$.subscribe();

 next: () => {
  if (subscription) {
 error: () => {},
 complete: () => {}
