javascript - How to forEach over a whole stream after a set interval in RxJS -


i have stream of zip codes want iterate on after set interval. i'm trying update temperature shown on page sending zip codes , getting data api. have send 1 @ time. need able distinct zip codes after interval , iterate on whole stream. want update temperature on page.

// stream of zip codes const zipcodestream =   rx.observable     .fromevent(zipcodeinput, 'input')     .map(e => e.target.value)     .filter(zip => zip.length === 5);  // create timer refresh data rx.observable   .interval(5000)   .zip(zipcodestream, ([i, zip]) => zip)   .foreach((...args) => {     console.log('interval foreach args', ...args);   }); 

this sends single zip code when new zip code entered , interval has passed. want access them iterate over.

since want save item emitted zipcodestream can iterate on them every 5 seconds, you'll need use replaysubject. these save items emitted, , replay them whenever observer subscribes.

in contrast, current zipcodestream observable "hot" observable. means starts emitting items it's created, , subsequent subscribers 'miss' items emitted before subscribed.

const zipcodestream =   rx.observable     .fromevent(zipcodeinput, 'input')     .map(e => e.target.value)     .filter(zip => zip.length === 5);  const zipcodesubject = new rx.replaysubject(); const zipcodedisposable = zipcodestream.subscribe(zipcodesubject);  rx.observable   .interval(5000)   // every 5000ms, emit items zipcodesubject   .flatmaplatest(() => zipcodesubject)   .foreach((...args) => {     console.log('interval foreach args', ...args);   }); 

Comments

Popular posts from this blog

java - Jasper subreport showing only one entry from the JSON data source when embedded in the Title band -

serialization - Convert Any type in scala to Array[Byte] and back -

SonarQube Plugin for Jenkins does not find SonarQube Scanner executable -