
关于
使用 fp-ts、ReaderTaskEither 和函数式依赖注入的 Node.js/Deno 后端函数式编程模式。
name: fp-backend description: 使用 fp-ts、ReaderTaskEither 和函数式依赖注入的 Node.js/Deno 后端函数式编程模式 risk: unknown source: community version: 1.0.0 author: kadu tags:
- fp-ts
- typescript
- backend
- functional-programming
- node
- deno
- dependency-injection
- reader-task-either
fp-ts 后端模式
使用 fp-ts 构建类型安全、可测试的后端服务的函数式编程模式。
适用场景
- 你正在使用 fp-ts 构建或重构 Node.js 或 Deno 后端。
- 任务涉及依赖注入、服务组合或使用 ReaderTaskEither 的类型化后端错误。
- 你需要函数式后端架构模式,而非孤立的工具函数片段。
核心概念
ReaderTaskEither (RTE)
ReaderTaskEither<R, E, A> 类型是函数式后端开发的核心:
- R (Reader):依赖/环境(数据库、配置、日志器)
- E (Either left):错误类型
- A (Either right):成功值
import * as RTE from 'fp-ts/ReaderTaskEither'
import * as TE from 'fp-ts/TaskEither'
import { pipe } from 'fp-ts/function'
// 定义依赖
type Deps = {
db: DatabaseClient
logger: Logger
config: Config
}
// 定义领域错误
type AppError =
| { _tag: 'NotFound'; resource: string; id: string }
| { _tag: 'ValidationError'; message: string }
| { _tag: 'DatabaseError'; cause: unknown }
| { _tag: 'Unauthorized'; reason: string }
// 服务函数
const getUser = (id: string): RTE.ReaderTaskEither<Deps, AppError, User> =>
pipe(
RTE.ask<Deps>(),
RTE.flatMap(({ db, logger }) =>
pipe(
RTE.fromTaskEither(db.users.findById(id)),
RTE.mapLeft((e): AppError => ({ _tag: 'DatabaseError', cause: e })),
RTE.flatMap(user =>
user
? RTE.right(user)
: RTE.left({ _tag: 'NotFound', resource: 'User', id })
),
RTE.tap(user => RTE.fromIO(() => logger.info(`Found user: ${user.id}`)))
)
)
)
服务层模式
定义服务模块
将服务结构化为导出 RTE 函数的模块:
// src/services/user.service.ts
import * as RTE from 'fp-ts/ReaderTaskEither'
import * as TE from 'fp-ts/TaskEither'
import * as A from 'fp-ts/Array'
import { pipe } from 'fp-ts/function'
type UserDeps = {
db: DatabaseClient
hasher: PasswordHasher
mailer: EmailService
}
type UserError =
| { _tag: 'UserNotFound'; id: string }
| { _tag: 'EmailExists'; email: string }
| { _tag: 'InvalidPassword' }
// 创建用户
export const create = (
input: CreateUserInput
): RTE.ReaderTaskEither<UserDeps, UserError, User> =>
pipe(
RTE.ask<UserDeps>(),
RTE.flatMap(({ db, hasher }) =>
pipe(
// 检查邮箱唯一性
checkEmailUnique(input.email),
RTE.flatMap(() =>
RTE.fromTaskEither(hasher.hash(input.password))
),
RTE.flatMap(hashedPassword =>
RTE.fromTaskEither(
db.users.create({
...input,
password: hashedPassword,
})
)
)
)
)
)
// 按 ID 查找
export const findById = (
id: string
): RTE.ReaderTaskEither<UserDeps, UserError, User> =>
pipe(
RTE.ask<UserDeps>(),
RTE.flatMap(({ db }) =>
pipe(
RTE.fromTaskEither(db.users.findUnique({ where: { id } })),
RTE.flatMap(user =>
user
? RTE.right(user)
: RTE.left({ _tag: 'UserNotFound' as const, id })
)
)
)
)
// 分页查询
export const findMany = (
params: PaginationParams
): RTE.ReaderTaskEither<UserDeps, UserError, PaginatedResult<User>> =>
pipe(
RTE.ask<UserDeps>(),
RTE.flatMap(({ db }) =>
RTE.fromTaskEither(
pipe(
TE.Do,
TE.bind('users', () => db.users.findMany({
skip: params.offset,
take: params.limit,
})),
TE.bind('total', () => db.users.count()),
TE.map(({ users, total }) => ({
data: users,
total,
...params,
}))
)
)
)
)
const checkEmailUnique = (
email: string
): RTE.ReaderTaskEither<UserDeps, UserError, void> =>
pipe(
RTE.ask<UserDeps>(),
RTE.flatMap(({ db }) =>
pipe(
RTE.fromTaskEither(db.users.findUnique({ where: { email } })),
RTE.flatMap(existing =>
existing
? RTE.left({ _tag: 'EmailExists' as const, email })
: RTE.right(undefined)
)
)
)
)
组合服务
// src/services/order.service.ts
import * as UserService from './user.service'
import * as ProductService from './product.service'
import * as PaymentService from './payment.service'
type OrderDeps = UserService.UserDeps &
ProductService.ProductDeps &
PaymentService.PaymentDeps & {
db: DatabaseClient
}
兼容工具
Claude CodeCursor
标签
前端开发