@@ -71,24 +71,39 @@ final class Pipeline extends FluentInterface implements IPipeline
7171 */
7272 private $ CollectThen = false ;
7373
74+ /**
75+ * @var array<callable(TOutput, IPipeline<TInput,TOutput,TArgument>, TArgument): mixed>
76+ */
77+ private $ Cc = [];
78+
7479 /**
7580 * @var (callable(TOutput, IPipeline<TInput,TOutput,TArgument>, TArgument): bool)|null
7681 */
7782 private $ Unless ;
7883
84+ /**
85+ * Creates a new Pipeline object
86+ */
7987 public function __construct (?IContainer $ container = null )
8088 {
8189 $ this ->Container = $ container ;
8290 }
8391
8492 /**
85- * @return IPipeline<TInput,TOutput,TArgument>
93+ * Get a new pipeline
94+ *
95+ * Syntactic sugar for `new Pipeline()`.
96+ *
97+ * @return self<TInput,TOutput,TArgument>
8698 */
87- public static function create (?IContainer $ container = null ): IPipeline
99+ public static function create (?IContainer $ container = null ): self
88100 {
89101 return new self ($ container );
90102 }
91103
104+ /**
105+ * @inheritDoc
106+ */
92107 public function send ($ payload , $ arg = null )
93108 {
94109 $ clone = clone $ this ;
@@ -100,6 +115,9 @@ public function send($payload, $arg = null)
100115 return $ clone ;
101116 }
102117
118+ /**
119+ * @inheritDoc
120+ */
103121 public function stream (iterable $ payload , $ arg = null )
104122 {
105123 $ clone = clone $ this ;
@@ -111,6 +129,9 @@ public function stream(iterable $payload, $arg = null)
111129 return $ clone ;
112130 }
113131
132+ /**
133+ * @inheritDoc
134+ */
114135 public function withConformity ($ conformity = ArrayKeyConformity::PARTIAL )
115136 {
116137 $ clone = clone $ this ;
@@ -119,6 +140,9 @@ public function withConformity($conformity = ArrayKeyConformity::PARTIAL)
119140 return $ clone ;
120141 }
121142
143+ /**
144+ * @inheritDoc
145+ */
122146 public function after (callable $ callback )
123147 {
124148 if ($ this ->After ) {
@@ -130,6 +154,9 @@ public function after(callable $callback)
130154 return $ clone ;
131155 }
132156
157+ /**
158+ * @inheritDoc
159+ */
133160 public function afterIf (callable $ callback )
134161 {
135162 if ($ this ->After ) {
@@ -139,6 +166,9 @@ public function afterIf(callable $callback)
139166 return $ this ->after ($ callback );
140167 }
141168
169+ /**
170+ * @inheritDoc
171+ */
142172 public function through (...$ pipes )
143173 {
144174 $ clone = clone $ this ;
@@ -147,6 +177,9 @@ public function through(...$pipes)
147177 return $ clone ;
148178 }
149179
180+ /**
181+ * @inheritDoc
182+ */
150183 public function throughCallback (callable $ callback )
151184 {
152185 return $ this ->through (
@@ -155,6 +188,9 @@ public function throughCallback(callable $callback)
155188 );
156189 }
157190
191+ /**
192+ * @inheritDoc
193+ */
158194 public function throughKeyMap (array $ keyMap , int $ flags = ArrayMapperFlag::ADD_UNMAPPED )
159195 {
160196 return $ this ->through (
@@ -167,6 +203,9 @@ public function throughKeyMap(array $keyMap, int $flags = ArrayMapperFlag::ADD_U
167203 );
168204 }
169205
206+ /**
207+ * @inheritDoc
208+ */
170209 public function then (callable $ callback )
171210 {
172211 if ($ this ->Then ) {
@@ -178,6 +217,9 @@ public function then(callable $callback)
178217 return $ clone ;
179218 }
180219
220+ /**
221+ * @inheritDoc
222+ */
181223 public function thenIf (callable $ callback )
182224 {
183225 if ($ this ->Then ) {
@@ -187,6 +229,9 @@ public function thenIf(callable $callback)
187229 return $ this ->then ($ callback );
188230 }
189231
232+ /**
233+ * @inheritDoc
234+ */
190235 public function collectThen (callable $ callback )
191236 {
192237 if ($ this ->Then ) {
@@ -199,6 +244,32 @@ public function collectThen(callable $callback)
199244 return $ clone ;
200245 }
201246
247+ /**
248+ * @inheritDoc
249+ */
250+ public function collectThenIf (callable $ callback )
251+ {
252+ if ($ this ->Then ) {
253+ return $ this ;
254+ }
255+
256+ return $ this ->collectThen ($ callback );
257+ }
258+
259+ /**
260+ * @inheritDoc
261+ */
262+ public function cc (callable $ callback )
263+ {
264+ $ clone = clone $ this ;
265+ $ clone ->Cc [] = $ callback ;
266+
267+ return $ clone ;
268+ }
269+
270+ /**
271+ * @inheritDoc
272+ */
202273 public function unless (callable $ filter )
203274 {
204275 if ($ this ->Unless ) {
@@ -210,6 +281,9 @@ public function unless(callable $filter)
210281 return $ clone ;
211282 }
212283
284+ /**
285+ * @inheritDoc
286+ */
213287 public function unlessIf (callable $ filter )
214288 {
215289 if ($ this ->Unless ) {
@@ -219,6 +293,9 @@ public function unlessIf(callable $filter)
219293 return $ this ->unless ($ filter );
220294 }
221295
296+ /**
297+ * @inheritDoc
298+ */
222299 public function run ()
223300 {
224301 if ($ this ->Stream || $ this ->CollectThen ) {
@@ -240,9 +317,16 @@ public function run()
240317 throw new PipelineResultRejectedException ($ this ->Payload , $ result );
241318 }
242319
320+ if ($ this ->Cc ) {
321+ $ this ->ccResult ($ result );
322+ }
323+
243324 return $ result ;
244325 }
245326
327+ /**
328+ * @inheritDoc
329+ */
246330 public function start (): Generator
247331 {
248332 if (!$ this ->Stream ) {
@@ -271,6 +355,10 @@ public function start(): Generator
271355 continue ;
272356 }
273357
358+ if ($ this ->Cc ) {
359+ $ this ->ccResult ($ result );
360+ }
361+
274362 yield $ key => $ result ;
275363 }
276364
@@ -280,25 +368,48 @@ public function start(): Generator
280368
281369 $ results = ($ this ->Then )($ results , $ this , $ this ->Arg );
282370 foreach ($ results as $ key => $ result ) {
371+ if ($ this ->Cc ) {
372+ $ this ->ccResult ($ result );
373+ }
374+
283375 yield $ key => $ result ;
284376 }
285377 }
286378
379+ /**
380+ * @inheritDoc
381+ */
287382 public function getConformity ()
288383 {
289384 return $ this ->PayloadConformity ;
290385 }
291386
387+ /**
388+ * @inheritDoc
389+ */
292390 public function runInto (IPipeline $ next )
293391 {
294392 return $ next ->send ($ this ->run (), $ this ->Arg );
295393 }
296394
395+ /**
396+ * @inheritDoc
397+ */
297398 public function startInto (IPipeline $ next )
298399 {
299400 return $ next ->stream ($ this ->start (), $ this ->Arg );
300401 }
301402
403+ /**
404+ * @param mixed $result
405+ */
406+ private function ccResult ($ result ): void
407+ {
408+ foreach ($ this ->Cc as $ callback ) {
409+ $ callback ($ result , $ this , $ this ->Arg );
410+ }
411+ }
412+
302413 /**
303414 * @param mixed $payload
304415 * @return never
0 commit comments