Skip to content

Commit 3e8d082

Browse files
committed
Add IPipeline::cc() to allow more flexible workflows
Also: - Add `IPipeline::collectThenIf()` for completeness
1 parent 08c5a8f commit 3e8d082

File tree

3 files changed

+160
-3
lines changed

3 files changed

+160
-3
lines changed

src/Contract/IPipeline.php

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,13 +162,35 @@ public function thenIf(callable $callback);
162162
*/
163163
public function collectThen(callable $callback);
164164

165+
/**
166+
* Collect results from the pipeline and pass them to a callback in batches
167+
* if a collectThen() callback hasn't already been applied
168+
*
169+
* @template TThenOutput
170+
* @param callable(array<TInput|TOutput> $results, IPipeline<TInput,TOutput,TArgument> $pipeline, TArgument $arg): iterable<TThenOutput> $callback
171+
* @return IPipeline<TInput,TThenOutput,TArgument>
172+
*/
173+
public function collectThenIf(callable $callback);
174+
175+
/**
176+
* Pass each result to a callback
177+
*
178+
* Results not discarded by {@see IPipeline::unless()} are passed to the
179+
* callback before leaving the pipeline. The callback's return value is
180+
* ignored.
181+
*
182+
* @param callable(TOutput $result, IPipeline<TInput,TOutput,TArgument> $pipeline, TArgument $arg): mixed $callback
183+
* @return IPipeline<TInput,TOutput,TArgument>
184+
*/
185+
public function cc(callable $callback);
186+
165187
/**
166188
* Apply a filter to each result
167189
*
168190
* This method can only be called once per pipeline.
169191
*
170192
* Analogous to `array_filter()`, although the effect of the callback's
171-
* return value is inverted.
193+
* return value is reversed.
172194
*
173195
* If `$filter` returns `false`, `$result` is returned to the caller,
174196
* otherwise:

src/Support/Pipeline.php

Lines changed: 113 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,24 +71,39 @@ final class Pipeline extends FluentInterface implements IPipeline
7171
*/
7272
private $CollectThen = false;
7373

74+
/**
75+
* @var array<callable(TOutput, IPipeline<TInput,TOutput,TArgument>, TArgument): mixed>
76+
*/
77+
private $Cc = [];
78+
7479
/**
7580
* @var (callable(TOutput, IPipeline<TInput,TOutput,TArgument>, TArgument): bool)|null
7681
*/
7782
private $Unless;
7883

84+
/**
85+
* Creates a new Pipeline object
86+
*/
7987
public function __construct(?IContainer $container = null)
8088
{
8189
$this->Container = $container;
8290
}
8391

8492
/**
85-
* @return IPipeline<TInput,TOutput,TArgument>
93+
* Get a new pipeline
94+
*
95+
* Syntactic sugar for `new Pipeline()`.
96+
*
97+
* @return self<TInput,TOutput,TArgument>
8698
*/
87-
public static function create(?IContainer $container = null): IPipeline
99+
public static function create(?IContainer $container = null): self
88100
{
89101
return new self($container);
90102
}
91103

104+
/**
105+
* @inheritDoc
106+
*/
92107
public function send($payload, $arg = null)
93108
{
94109
$clone = clone $this;
@@ -100,6 +115,9 @@ public function send($payload, $arg = null)
100115
return $clone;
101116
}
102117

118+
/**
119+
* @inheritDoc
120+
*/
103121
public function stream(iterable $payload, $arg = null)
104122
{
105123
$clone = clone $this;
@@ -111,6 +129,9 @@ public function stream(iterable $payload, $arg = null)
111129
return $clone;
112130
}
113131

132+
/**
133+
* @inheritDoc
134+
*/
114135
public function withConformity($conformity = ArrayKeyConformity::PARTIAL)
115136
{
116137
$clone = clone $this;
@@ -119,6 +140,9 @@ public function withConformity($conformity = ArrayKeyConformity::PARTIAL)
119140
return $clone;
120141
}
121142

143+
/**
144+
* @inheritDoc
145+
*/
122146
public function after(callable $callback)
123147
{
124148
if ($this->After) {
@@ -130,6 +154,9 @@ public function after(callable $callback)
130154
return $clone;
131155
}
132156

157+
/**
158+
* @inheritDoc
159+
*/
133160
public function afterIf(callable $callback)
134161
{
135162
if ($this->After) {
@@ -139,6 +166,9 @@ public function afterIf(callable $callback)
139166
return $this->after($callback);
140167
}
141168

169+
/**
170+
* @inheritDoc
171+
*/
142172
public function through(...$pipes)
143173
{
144174
$clone = clone $this;
@@ -147,6 +177,9 @@ public function through(...$pipes)
147177
return $clone;
148178
}
149179

180+
/**
181+
* @inheritDoc
182+
*/
150183
public function throughCallback(callable $callback)
151184
{
152185
return $this->through(
@@ -155,6 +188,9 @@ public function throughCallback(callable $callback)
155188
);
156189
}
157190

191+
/**
192+
* @inheritDoc
193+
*/
158194
public function throughKeyMap(array $keyMap, int $flags = ArrayMapperFlag::ADD_UNMAPPED)
159195
{
160196
return $this->through(
@@ -167,6 +203,9 @@ public function throughKeyMap(array $keyMap, int $flags = ArrayMapperFlag::ADD_U
167203
);
168204
}
169205

206+
/**
207+
* @inheritDoc
208+
*/
170209
public function then(callable $callback)
171210
{
172211
if ($this->Then) {
@@ -178,6 +217,9 @@ public function then(callable $callback)
178217
return $clone;
179218
}
180219

220+
/**
221+
* @inheritDoc
222+
*/
181223
public function thenIf(callable $callback)
182224
{
183225
if ($this->Then) {
@@ -187,6 +229,9 @@ public function thenIf(callable $callback)
187229
return $this->then($callback);
188230
}
189231

232+
/**
233+
* @inheritDoc
234+
*/
190235
public function collectThen(callable $callback)
191236
{
192237
if ($this->Then) {
@@ -199,6 +244,32 @@ public function collectThen(callable $callback)
199244
return $clone;
200245
}
201246

247+
/**
248+
* @inheritDoc
249+
*/
250+
public function collectThenIf(callable $callback)
251+
{
252+
if ($this->Then) {
253+
return $this;
254+
}
255+
256+
return $this->collectThen($callback);
257+
}
258+
259+
/**
260+
* @inheritDoc
261+
*/
262+
public function cc(callable $callback)
263+
{
264+
$clone = clone $this;
265+
$clone->Cc[] = $callback;
266+
267+
return $clone;
268+
}
269+
270+
/**
271+
* @inheritDoc
272+
*/
202273
public function unless(callable $filter)
203274
{
204275
if ($this->Unless) {
@@ -210,6 +281,9 @@ public function unless(callable $filter)
210281
return $clone;
211282
}
212283

284+
/**
285+
* @inheritDoc
286+
*/
213287
public function unlessIf(callable $filter)
214288
{
215289
if ($this->Unless) {
@@ -219,6 +293,9 @@ public function unlessIf(callable $filter)
219293
return $this->unless($filter);
220294
}
221295

296+
/**
297+
* @inheritDoc
298+
*/
222299
public function run()
223300
{
224301
if ($this->Stream || $this->CollectThen) {
@@ -240,9 +317,16 @@ public function run()
240317
throw new PipelineResultRejectedException($this->Payload, $result);
241318
}
242319

320+
if ($this->Cc) {
321+
$this->ccResult($result);
322+
}
323+
243324
return $result;
244325
}
245326

327+
/**
328+
* @inheritDoc
329+
*/
246330
public function start(): Generator
247331
{
248332
if (!$this->Stream) {
@@ -271,6 +355,10 @@ public function start(): Generator
271355
continue;
272356
}
273357

358+
if ($this->Cc) {
359+
$this->ccResult($result);
360+
}
361+
274362
yield $key => $result;
275363
}
276364

@@ -280,25 +368,48 @@ public function start(): Generator
280368

281369
$results = ($this->Then)($results, $this, $this->Arg);
282370
foreach ($results as $key => $result) {
371+
if ($this->Cc) {
372+
$this->ccResult($result);
373+
}
374+
283375
yield $key => $result;
284376
}
285377
}
286378

379+
/**
380+
* @inheritDoc
381+
*/
287382
public function getConformity()
288383
{
289384
return $this->PayloadConformity;
290385
}
291386

387+
/**
388+
* @inheritDoc
389+
*/
292390
public function runInto(IPipeline $next)
293391
{
294392
return $next->send($this->run(), $this->Arg);
295393
}
296394

395+
/**
396+
* @inheritDoc
397+
*/
297398
public function startInto(IPipeline $next)
298399
{
299400
return $next->stream($this->start(), $this->Arg);
300401
}
301402

403+
/**
404+
* @param mixed $result
405+
*/
406+
private function ccResult($result): void
407+
{
408+
foreach ($this->Cc as $callback) {
409+
$callback($result, $this, $this->Arg);
410+
}
411+
}
412+
302413
/**
303414
* @param mixed $payload
304415
* @return never

tests/unit/Support/PipelineTest.php

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,30 @@ public function testAfter(): void
5555
);
5656
}
5757

58+
public function testCc(): void
59+
{
60+
$in = [12, 23, 34, 45, 56, 67, 78, 89, 90];
61+
$out1 = [];
62+
$out2 = [];
63+
foreach ((new Pipeline())
64+
->stream($in)
65+
->through(fn($payload, Closure $next) => $next($payload * 6))
66+
->cc(function ($result) use (&$out1) { $out1[] = round($result / 23, 3); })
67+
->start() as $_out) {
68+
$out2[] = $_out;
69+
}
70+
71+
$this->assertSame(
72+
[3.13, 6.0, 8.87, 11.739, 14.609, 17.478, 20.348, 23.217, 23.478],
73+
$out1
74+
);
75+
76+
$this->assertSame(
77+
[72, 138, 204, 270, 336, 402, 468, 534, 540],
78+
$out2
79+
);
80+
}
81+
5882
public function testUnless(): void
5983
{
6084
$in = [12, 23, 34, 45, 56, 67, 78, 89, 90];

0 commit comments

Comments
 (0)