RxJS 序列等价于 promise.then()?

IT技术 javascript rxjs
2021-02-23 06:42:41

我曾经用 promise 进行过很多开发,现在我正在转向 RxJS。RxJS 的文档没有提供一个非常清晰的例子来说明如何从Promise链转移到观察者序列。

例如,我通常用多个步骤编写Promise链,例如

// a function that returns a promise
getPromise()
.then(function(result) {
   // do something
})
.then(function(result) {
   // do something
})
.then(function(result) {
   // do something
})
.catch(function(err) {
    // handle error
});

我应该如何以 RxJS 风格重写这个Promise链?

6个回答

对于数据流(相当于then):

Rx.Observable.fromPromise(...)
  .flatMap(function(result) {
   // do something
  })
  .flatMap(function(result) {
   // do something
  })
  .subscribe(function onNext(result) {
    // end of chain
  }, function onError(error) {
    // process the error
  });

可以将 promise 转换为 observable Rx.Observable.fromPromise

一些Promise运算符有直接翻译。例如RSVP.all, 或jQuery.when可以替换为Rx.Observable.forkJoin

请记住,您有一堆操作符,它们允许异步转换数据,并执行您不能或很难用 Promise 完成的任务。Rxjs 展示了异步数据序列(即超过 1 个异步值的序列)的所有功能。

对于错误管理,这个主题有点复杂。

  • 也有catchfinally操作符
  • retryWhen 还可以帮助在出现错误时重复序列
  • 您还可以使用该onError函数处理订阅者本身的错误

要获得精确的语义,请更深入地查看您可以在网络上找到的文档和示例,或在此处提出具体问题。

这绝对是使用 Rxjs 进行更深入错误管理的一个很好的起点:https ://xgrommx.github.io/rx-book/content/getting_started_with_rxjs/creating_and_querying_observable_sequences/error_handling.html

Promise.then是而.flatMap不是.map
2021-04-20 06:42:41
FYI这是不完全等同于Promise从第3版本的错误then会被卡住catch他们不是在这里。
2021-04-20 06:42:41
正是如此。如果没有观察者通过 subscribe 传递,您的 observable 将不会发出任何数据,因此您不会看到任何数据流。
2021-04-30 06:42:41
我总是看到可观察序列以 subscribe() 结尾。由于这只是 observable 对象的一个​​函数,有什么理由这样做吗?是启动序列的功能吗?
2021-05-06 06:42:41
我建议你看看这个:gist.github.com/staltz/868e7e9bc2a7b8c1f754它可能比官方文档更可口。
2021-05-10 06:42:41

一个更现代的选择:

import {from as fromPromise} from 'rxjs';
import {catchError, flatMap} from 'rxjs/operators';

fromPromise(...).pipe(
   flatMap(result => {
       // do something
   }),
   flatMap(result => {
       // do something
   }),
   flatMap(result => {
       // do something
   }),
   catchError(error => {
       // handle error
   })
)

另请注意,要使所有这些工作,您需要将其subscribe通过管道传输Observable到某处,但我假设它是在应用程序的其他部分处理的。

我对 RxJS 很陌生,但鉴于我们在这里只处理一个事件的初始流mergeMap()因此实际上没有任何要合并的内容,我相信在这种情况下我们可以使用concatMap()switchMap()我做对了吗...?
2021-04-29 06:42:41

2019 年 5 月更新,使用 RxJs 6

同意上面提供的答案,希望使用RxJs v6添加一个带有一些玩具数据和简单Promise(使用 setTimeout)的具体示例以增加清晰度。

只需将传递的 id(当前硬编码为1更新为不存在的内容即可执行错误处理逻辑。重要的是,还要注意ofwithcatchError消息的使用

import { from as fromPromise, of } from "rxjs";
import { catchError, flatMap, tap } from "rxjs/operators";

const posts = [
  { title: "I love JavaScript", author: "Wes Bos", id: 1 },
  { title: "CSS!", author: "Chris Coyier", id: 2 },
  { title: "Dev tools tricks", author: "Addy Osmani", id: 3 }
];

const authors = [
  { name: "Wes Bos", twitter: "@wesbos", bio: "Canadian Developer" },
  {
    name: "Chris Coyier",
    twitter: "@chriscoyier",
    bio: "CSS Tricks and CodePen"
  },
  { name: "Addy Osmani", twitter: "@addyosmani", bio: "Googler" }
];

function getPostById(id) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      const post = posts.find(post => post.id === id);
      if (post) {
        console.log("ok, post found!");
        resolve(post);
      } else {
        reject(Error("Post not found!"));
      }
    }, 200);
  });
}

function hydrateAuthor(post) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      const authorDetails = authors.find(person => person.name === post.author);
      if (authorDetails) {
        post.author = authorDetails;
        console.log("ok, post hydrated with author info");
        resolve(post);
      } else {
        reject(Error("Author not Found!"));
      }
    }, 200);
  });
}

function dehydratePostTitle(post) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      delete post.title;
      console.log("ok, applied transformation to remove title");
      resolve(post);
    }, 200);
  });
}

// ok, here is how it looks regarding this question..
let source$ = fromPromise(getPostById(1)).pipe(
  flatMap(post => {
    return hydrateAuthor(post);
  }),
  flatMap(post => {
    return dehydratePostTitle(post);
  }),
  catchError(error => of(`Caught error: ${error}`))
);

source$.subscribe(console.log);

输出数据:

ok, post found!
ok, post hydrated with author info
ok, applied transformation to remove title
{ author:
   { name: 'Wes Bos',
     twitter: '@wesbos',
     bio: 'Canadian Developer' },
  id: 1 }

关键部分等价于以下使用简单的 promise 控制流:

getPostById(1)
  .then(post => {
    return hydrateAuthor(post);
  })
  .then(post => {
    return dehydratePostTitle(post);
  })
  .then(author => {
    console.log(author);
  })
  .catch(err => {
    console.error(err);
  });
完美的答案,但现在 flatMap 已被弃用!新方法是什么?
2021-05-06 06:42:41
flatMap -> mergeMap
2021-05-10 06:42:41

如果我理解正确,您的意思是使用这些值,在这种情况下,您使用 sbuscribe 即

const arrObservable = from([1,2,3,4,5,6,7,8]);
arrObservable.subscribe(number => console.log(num) );

此外,您可以使用 toPromise() 将 observable 转换为 promise,如下所示:

arrObservable.toPromise().then()

如果getPromise函数在流管道的中间,你应该简单地将它包装成函数之一mergeMapswitchMap或者concatMap(通常mergeMap):

stream$.pipe(
   mergeMap(data => getPromise(data)),
   filter(...),
   map(...)
 ).subscribe(...);

如果你想开始你的流,getPromise()然后把它包装成from函数:

import {from} from 'rxjs';

from(getPromise()).pipe(
   filter(...)
   map(...)
).subscribe(...);