Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions src/Fork.php
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ protected function finishTask(Task $task): mixed
return $output;
}

protected function forkForTask(Task $task): Task
protected function forkForTask(Task $task): ?Task
{
[$socketToParent, $socketToChild] = Connection::createPair();

Expand All @@ -129,8 +129,13 @@ protected function forkForTask(Task $task): Task
try {
$this->executeInChildTask($task, $socketToParent);
} finally {
exit();
if (! extension_loaded('posix')) {
exit();
}

posix_kill(getmypid(), SIGKILL);
}

}

$socketToParent->close();
Expand Down Expand Up @@ -158,7 +163,7 @@ protected function listenForSignals(): void
protected function exit(): void
{
if (! extension_loaded('posix')) {
exit;
exit();
}

foreach ($this->runningTasks as $task) {
Expand Down
11 changes: 5 additions & 6 deletions src/Task.php
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,13 @@ public function isFinished(): bool
{
$this->output .= $this->connection->read()->current();

$status = pcntl_waitpid($this->pid(), $status, WNOHANG | WUNTRACED);
if (pcntl_waitpid($this->pid(), $status, WNOHANG | WUNTRACED) === $this->pid) {

if ($status === $this->pid) {
return true;
}
if (pcntl_wexitstatus($status) !== 0) {
throw CouldNotManageTask::make($this);
}

if ($status !== 0) {
throw CouldNotManageTask::make($this);
return true;
}

return false;
Expand Down
52 changes: 34 additions & 18 deletions tests/ForkTest.php
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
<?php

use Carbon\Carbon;
use Spatie\Fork\Exceptions\CouldNotManageTask;
use Spatie\Fork\Fork;

it('will execute the given closures')
test('will execute the given closures')
->expect(
fn () => Fork::new()->run(
fn () => 1 + 1,
fn () => 2 + 2,
)
)->toEqual([2, 4]);
)->toEqual([2, 4])->not()->toThrow(CouldNotManageTask::class);

it('will execute the given closure with concurrency cap', function () {
test('will execute the given closure with concurrency cap', function () {
$results = Fork::new()
->concurrent(2)
->run(
Expand All @@ -36,21 +37,21 @@ function () {
->and($results[2])->not->toEqual($results[1]);
});

it('can execute the closures concurrently', function () {
test('can execute the closures concurrently', function () {
Fork::new()
->run(
...array_fill(
start_index: 0,
count: 20,
value: fn () => usleep(100_000),
) // 1/10th of a second each
start_index: 0,
count: 20,
value: fn () => usleep(100_000),
) // 1/10th of a second each
);

assertTookLessThanSeconds(1);
});

test('the callable given to before runs before each callable')
->expect(
test('the callable given to before runs before each callable', function() {
expect(
Fork::new()
->before(function () {
global $globalBeforeValue;
Expand All @@ -63,9 +64,10 @@ function () {
return 1 + $globalBeforeValue;
})
)->toEqual([3]);
});

test('the callable given to after runs after each callable')
->expect(
test('the callable given to after runs after each callable', function() {
expect(
Fork::new()
->after(function () {
global $globalAfterValue;
Expand All @@ -82,6 +84,7 @@ function () {
},
)
)->toEqual([1]);
});

test('the callable given to before can be run in the parent process', function () {
$value = 0;
Expand Down Expand Up @@ -131,16 +134,17 @@ function () {
expect($value)->toEqual(2);
});

it('will not hang by truncating the result when large output is returned')
->expect(
test('will not hang by truncating the result when large output is returned', function() {
expect(
Fork::new()->run(
fn () => file_get_contents('https://stitcher.io/rss'),
fn () => file_get_contents('https://sebastiandedeyne.com/index.xml'),
fn () => file_get_contents('https://rubenvanassche.com/rss/'),
)
)->toHaveCount(3);
});

it('can return objects', function () {
test('can return objects', function () {
$result = Fork::new()
->run(
fn () => new DateTime('2021-01-01'),
Expand All @@ -163,9 +167,9 @@ function () {
);
});

test('allow 2nd process to be done before the 1st')
->expect(
fn () => Fork::new()->run(
test('allow 2nd process to be done before the 1st', function() {
expect(
Fork::new()->run(
static function () {
usleep(200_000);

Expand All @@ -178,3 +182,15 @@ static function () {
},
)
)->toEqual([2,1]);
});

test('it throws exception in case of unexpected exit status', function() {
expect(
fn() => Fork::new()->run(
static function() {
exit(1);
},
)
)->toThrow(CouldNotManageTask::class);

});