-
Notifications
You must be signed in to change notification settings - Fork 31
/
007-fiber.ts
183 lines (143 loc) · 4.41 KB
/
007-fiber.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
import { pipe, Effect, Exit, Fiber, Array, Duration } from "effect";
import { satisfies } from "effect/Function";
/*
* Until now we executed effects in a way that made them look synchronous.
*
* That's one special aspect of Effect - you can mix async and sync code in
* the same program, without labeling functions separately.
*
* To execute an effect without blocking the current process, we can use fibers,
* which are a lightweight concurrency mechanism.
*/
class Identifier {
constructor(readonly id: number) {}
}
const sleeper = (id: number, seconds = 1000) => {
const identifier = new Identifier(id);
return pipe(
Effect.sleep(Duration.millis(seconds)),
Effect.tap(() => Effect.log(`waked from ${identifier.id}`)),
Effect.flatMap(() => Effect.succeed(identifier)),
);
};
export const example1 = Effect.gen(function* () {
yield* Effect.log("before");
const fiber = yield* Effect.fork(sleeper(1));
fiber satisfies Fiber.RuntimeFiber<Identifier>;
yield* Effect.log("after");
const id = yield* Fiber.join(fiber);
id satisfies Identifier;
yield* Effect.log(JSON.stringify(id));
});
// Effect.runPromise(example1);
/*
* Running it yields:
*
* fiber=#0 message="before"
* fiber=#0 message="after"
* fiber=#1 message="waked from 1"
* fiber=#0 message="{"op":6,"value":1}"
*
* As you can notice, the forked code runs in a separate fiber.
*/
const longFailing = (id: Identifier) =>
pipe(
Effect.sleep(Duration.seconds(1)),
Effect.flatMap(() => Effect.fail("blah" as const)),
Effect.tap(() => Effect.log(`waked from ${id.id}`)),
Effect.flatMap(() => Effect.succeed(id)),
);
/*
* Using Fiber.join / joinAll will result in a catchable error when running a
* failing effect
*/
export const example2 = Effect.gen(function* () {
const fiber = yield* Effect.fork(longFailing(new Identifier(1)));
(yield* Fiber.join(fiber)) satisfies Identifier;
});
// Effect.runPromise(example2).catch(x => console.log('error', x));
/*
* An alternative is using await which gives an Exit back
*/
export const example3 = Effect.gen(function* () {
const fiber = yield* Effect.fork(longFailing(new Identifier(1)));
const exit = yield* Fiber.await(fiber);
exit satisfies Exit.Exit<Identifier, "blah">;
yield* Effect.log(JSON.stringify(exit));
});
/*
* Effect makes it easier to write concurrent code despite concurrent code
* usually being notoriously difficult to write correctly.
*/
const effects = [sleeper(1, 300), sleeper(2, 100), sleeper(3, 200)];
// ^ Effect<Identifier>[]
/*
* Most of the Effect high level functions that handle Iterables, accept an
* options object as a second argument which allows to enable concurrency
*/
const concurrent = { concurrency: "inherit" } as const;
// inherit:
// uses the current concurrency value (set with Effect.withConcurrency),
// if nothing is set this defaults to unbounded
//
// unbounded:
// uses as many fibers are possible
//
// integer value:
// uses exactly that many fibers
export const example4 = Effect.gen(function* ($) {
const ids = yield* Effect.all(effects, { concurrency: 5 });
// ^ Identifier[]
console.log(ids);
});
// Effect.runPromise(example4);
/*
* fiber=#2 message="waked from 2"
* fiber=#3 message="waked from 3"
* fiber=#1 message="waked from 1"
* [ 1, 2, 3 ]
*/
export const example5 = Effect.gen(function* () {
const identifiers = Array.map(effects, effect =>
Effect.map(effect, _ => _.id),
);
identifiers satisfies readonly Effect.Effect<number>[];
const sum = Effect.reduceEffect(
identifiers,
Effect.succeed(0),
(acc, a) => acc + a,
concurrent,
);
console.log(yield* sum);
});
// Effect.runPromise(example5);
/*
* fiber=#2 message="waked from 2"
* fiber=#3 message="waked from 3"
* fiber=#1 message="waked from 1"
* 6
*/
export const example6 = Effect.gen(function* () {
const winner = pipe(
Effect.raceAll(effects), // Races effects with Effect.never()
Effect.map(_ => _.id),
);
console.log(yield* winner);
});
// Effect.runPromise(example6);
/*
* fiber=#2 message="waked from 2"
* 2
*/
export const example7 = Effect.gen(function* () {
const identifiers = Effect.forEach([7, 8, 9], x => sleeper(x), concurrent);
// ^ Effect<never, never, Identifier[]>
console.log(yield* identifiers);
});
// Effect.runPromise(example7);
/*
* fiber=#1 message="waked from 7"
* fiber=#2 message="waked from 8"
* fiber=#3 message="waked from 9"
* 7,8,9
*/