From the one side, we have forks for parallelization. From the other, we have microservices.
Jobs are similar and different at the same time. Let's start from this fact.
Forks are coroutines. They work within the same process (and even within the same thread, cause workers are single-threaded). The context is switched between forks at the moment of network waiting. A fork works — starts waiting for the net — another fork becomes active. When you start 5 forks, the first will work until a network query, then the second until a network query, etc. Global variables are the same. CPU execution can't be parallelized.
Jobs are separate processes. When you start 5 jobs, they will start executing simultaneously (if there are 5 free workers). You can parallelize any PHP code and even start jobs that will be left alive after http worker ends the PHP script. From the job, you're allowed to launch forks and even other jobs. Global variables are inited from scratch. But yes — there is an overlead to launch jobs, much more than to launch forks.
Microservices (kphp-as-rpc) are typically launched on other machines (or a cluster). If they become overloaded, you should add more machines to that cluster. There is a TL scheme of queries and responses, along with autogenerated PHP classes. You should think about backwards compatibility using field masks, as microservices could have an old version of backend code running. Sending a response is serializing PHP instances according to the TL scheme, sending via network, and deserializing on another side. From the PHP's point of view, querying a microservice is doing the same calls as querying any other RPC engine.
Jobs are processes on the same machine. If they become overloaded, you should balance http/job workers ratio or add more machines generally. Jobs and TL have nothing in common: manually written PHP classes represent master data actually. You don't deal with backwards compatibility, as jobs are processes of the same KPHP server binary (which also serves HTTP). Querying a job is copying data to shared memory. From the PHP's point of view, it's a call to JobLauncher::start() getting a future, and then wait().
Ideologically, jobs and microservices are close, but jobs are much more performant due no absense of [de]serialization and sending data via network (though not free of charge, of course).
It will calc pow($a,2) for an array in a separate process. Input: [1,2,3,4,5], output: [1,4,9,16,25].
What we will need:
class MyRequest extends \JobWorkers\JobWorkerSimple {
/** @var int[] */
public $arr_to_x2;
function __construct(array $arr_to_x2) {
$this->arr_to_x2 = $arr_to_x2;
}
function handleRequest(): ?\KphpJobWorkerResponse {
$response = new MyResponse();
$response->arr_x2 = array_map(fn($v) => $v ** 2, $this->arr_to_x2);
return $response;
}
}
class MyResponse implements \KphpJobWorkerResponse {
/** @var int[] */
public $arr_x2;
}
Now, launch a job:
$arr = [1, 2, 3, 4, 5];
$timeout = 0.1;
$job_request = new MyRequest($arr);
$job_id = \JobWorkers\JobLauncher::start($job_request, $timeout);
// ... in practice, some useful code works between start and wait
$response = wait($job_id);
if ($response instanceof MyResponse) {
var_dump($response->arr_x2);
}
This demo is available in the (github) kphp-snippets/JobWorkers repository.
Open that link, follow instructions to compile, and run. The demo is based on the code above.
All in all, what we have done: called JobLauncher::start()
, got future
(like a fork!), spent some time in wait()
, and got the response. Actual parallelization has been done internally.
At the current moment, there are 3 job policies. Your job must extend one of this classes:
1) JobWorkerSimple: work and finish
“Simple” means “understandable”, “clear”, because it's the most intuitive usage:
On wait(), http worker continues after job worker finishes.
2) JobWorkerManualRespond: work, send a response and continue working
“ManualRespond” means “responding in the middle of execution”:
On wait(), http worker continues in the middle of job worker execution. After wait(), they work simultaneously.
3) JobWorkerNoReply: respondless workers
“NoReply” means “never return a response”:
http worker continues execution immediateately after sending a job: wait() can't be called, a response won't exist.
http worker can even finish a script and reset, and a launched job worker would still continue running.
It's similar to fastcgi_finish_request()
, to perform some actions in the background. Purpose: send a response to a user as quick as possible, writing stats and other bg-processes afterward.
The kphp-snippets repository has examples of all three policies.
Fallbacks are necessary without any doubt.
First of all, job workers may not exist:
$use_job_workers = JobLauncher::isEnabled() && isToggleOn('...');
if ($use_job_workers) {
// try to launch a job, e.g. JobLauncher::start()
} else {
// local fallback
}
In plain PHP, there are no job workers, start() and similar don’t work at all. Then JobLauncher::isEnabled() would return false. Hence, the code above covers plain PHP also.
What is a fallback to local execution? The following pattern is supposed: we call the same handleRequest(), but within the same process. It looks like this in pseudocode:
// here's the idea (pseudocode)
if ($use_job_workers) {
$job_id = JobLauncher::start($job_request, $timeout);
} else {
$response = $job_request->handleRequest();
}
The idea is clear, but the code above is unhandy: inside if we have a future, inside else — a ready result. To make it handy, we want to have future in else also, to have the rest of the code equal. To achieve this, use a wrapper localWaitableFallback()
which matches types as expected:
$job_request = new MyJobRequest(...);
$use_job_workers = ...;
if ($use_job_workers) {
$job_id = JobLauncher::start($job_request, $timeout);
} else {
$job_id = $job_request->localWaitableFallback(); // future onto the local handleRequest()
}
$response = wait($job_id);
Errors may occur in various scenarios:
JobLauncher::start()
returns false
wait()
— it returns false
wait()
returns KphpJobWorkerResponseError
When everything works as expected, wait()
would return a class instance that a job had responded.
Hence, you have to handle these cases:
start()
returns false
— fallback to local executionwait()
returns KphpJobWorkerResponseError
— either fail, or fallback to local executionWe end up with the following pattern, which is supposed to be used:
// prepare a job anyway
$job_request = new MyJobRequest(...);
// when job workers are off, or toggle is off — don't run it
$use_job_workers = JobLauncher::isEnabled() && isToggleOn(...);
// if didn't launch or error launching — local fallback
$job_id = $use_job_workers ? JobLauncher::start($job_request, $timeout) : false;
if (!$job_id) {
$job_id = $job_request->localWaitableFallback();
}
// ... somewhere later:
$response = wait($job_id);
if ($response instanceof \KphpJobWorkerResponseError) {
$response->getErrorCode(); // int
$response->getError(); // string
// either fail or local fallback
}
else if ($response instanceof MyExpectedResponse) {
// everything is ok, use $response
}
// "else" is impossible if a job returns MyExpectedResponse
Jobs are separate process, and when a job starts, it doesn't have any context from a caller worker. No context at all: neither $UserId, nor $_SERVER[‘HTTP_HOST'] — nothing except what is passed directly.
For example, a job needs global $UserId
to run correctly. You may think of doing this way:
class MyJobRequest extends JobWorkerSimple {
private mixed $userId;
function __construct() {
global $UserId;
$this->userId = $UserId;
}
// DO NOT do like this
function handleRequest() {
global $UserId;
$UserId = $this->userId;
}
}
It's bad, because handleRequest()
occasionally changes global state. But we remember, that handleRequest() is also a fallback to local execution, and it's very scary if it can change globals in a main process.
What's the right way then? The answer is: overload methods saveGlobalsContext()
and restoreGlobalsContext()
. They are automatically called in wrappers. There you save context either into class fields like above, or into mixed[] $untyped_context
if case you don't need strict typing (which might be often true for globals):
class MyJobRequest extends JobWorkerSimple {
protected function saveGlobalsContext(array &$untyped_context) {
global $UserId;
$untyped_context['UserId'] = $UserId;
}
protected function restoreGlobalsContext(array $untyped_context) {
global $UserId;
$UserId = $untyped_context['UserId'];
}
}
And remain handleRequest()
pure! Keep in mind that it may be called in the main process.
Your job must extend one of the following classes: JobWorkerSimple, JobWorkerManualRespond, or JobWorkerNoReply.
Extend to implement the policy: take a request, handle it, respond, and finish immediately.
Such jobs are launched by JobLauncher::start() and JobLauncher::startMulti().
To be overridden, you should return a response or null in case or error.
Calls handleRequest()
and returns future<>
onto it.
Extend to implement to policy: take a request, prepare response, respond, and continue execution in the background.
Such jobs are launched by JobLauncher::start() and JobLauncher::startMulti().
To be overridden, you should call $this->respondAndContinueExecution()
in the middle.
Sends a response in the middle; only one response can be sent. After this, wait()
from an http worker unfreezes.
Calls handleRequest()
and returns future<>
onto the result written by $this->respondAndContinueExecution()
.
Extend to implement the policy: take a request, and never respond, just work and die somewhen.
Such jobs are launched by JobLauncher::startNoReply().
To be overridden, don't return anything.
Calls handleRequest()
and doesn't return anything, wait()
is inapplicable for it.
In every job class you can overload the following methods:
Called within the main process; here you save globals state into typed class fields or into provided untyped hashmap, to pass them to a job worker.
Called within a job process; here you restore globals from fields/context saved by a function above.
Called within the main process, before a job would be sent to a queue.
Called within a job process, before handleRequest()
.
Launching workers and checking environment. All methods are static.
Launch a job worker, in a separate process on the same machine.
Internally, instance $job is deeply copied into shared memory, and available from a job process for reading.
$timeout in seconds (0.5 is half a second). It's a timeout to (wait in jobs queue + execution).
Launch multiple workers which may share a common memory piece, to avoid multiple copying.
See below.
Launch a job that never returns a response, it may be left alive even after http worker ends the PHP script.
It's analogous to fastcgi_finish_request()
: write statistics in the background, etc.
Returns true|false
instead of future|false
(was the job added to the jobs queue).
Can a job worker be created from the current process. A preferred usage pattern was given above.
The number of job workers processes launched on a server (they all are launched on server start, controlled by the --job-workers-ratio
option).
May be used to group input data into chunks to best fit without waiting in a queue, but keep in mind, that job workers are shared across all http workers.
Returns whether we are inside a job worker now.
Try not to use this function and keep your logic independent on whether it's running inside a job or not.
A common usecase is when you have an input array, and you want to create several jobs to calculate it in parallel. It means, that you need an array of jobs, each operating its own chunk:
$use_job_workers = ...;
$chunks = ...;
$job_ids = [];
foreach ($chunks as $chunk) {
$job_request = new MyJobRequest($chunk);
$job_id = $use_job_workers ? JobLauncher::start($job_request, $timeout) : false;
$job_ids[] = $job_id ?: $job_request->localWaitableFallback();
}
$responses = wait_multi($job_ids);
// every response should contain info about the chunk to perform the correct merge
We just create jobs in a loop, push them into future<>[]
, and wait. Any method of waiting is correct — for instance, using wait queue and polling.
How to split data into chunks? There is no single answer covering every situation. Either just “split into 5 chunks”, or vary this number depending on JobLauncher::getJobWorkersNumber()
. You might want to leave one chunk be handled by the main process if it has nothing to do except waiting. Notably, if you have a small amount of data, you'd better not launch any jobs at all, but handle it locally.
Every job containing its separate chunk is copied into shared memory. But, if there are properties that are equal across all the jobs, a suggestion is to copy that common piece once. See the next chapter.
A common usecase when launching jobs in chunks is to pass a common context. For example, a general worker has loaded users and posts, and wants to launch jobs to rank posts [0,100], [100,200], [200,300], etc.
Of course, this code will work:
class MyRequest {
public $users;
public $posts;
public int $offset;
public int $limit;
// __construct()
// handleRequest()
}
$users = ...;
$posts = ...;
$job_ids = [];
for ($offset = 0; $offset < $total_count; $offset += 100) {
$request = new MyRequest($users, $posts, $offset, 100);
$job_ids[] = JobLauncher::start($request, $timeout);
// don't forget about local fallback
}
But it's bad, since $posts and $users are equal, but they'll be copied 4 times having 4 jobs lauched.
Here is the way to avoid this, i.e. to copy them only once:
JobLauncher::startMulti()
/** @kphp-immutable-class */
class MyRequestSharedPiece implements KphpJobWorkerSharedMemoryPiece {
public $users;
public $posts;
// __construct()
}
class MyRequest {
public MyRequestSharedPiece $ctx;
public int $offset;
public int $limit;
// __construct()
// handleRequest()
}
$users = ...;
$posts = ...;
$ctx = new MyRequestSharedPiece($users, $posts);
$requests = [];
for ($offset = 0; $offset < $total_count; $offset += 100) {
$requests[] = new MyRequest($ctx, $offset, 100);
}
$job_ids = JobLauncher::startMulti($requests, $timeout);
// don't forget about local fallback (check $job_ids for false in a loop)
Pay attention to implements KphpJobWorkerSharedMemoryPiece
and @kphp-immutable-class
.
startMulti()
in KPHP runtime acts the following way:
handleRequest()
, it can be used for reading only (due to immutable annotation) via $this->ctx
, as a regular instance.A limitation: a job request can't have more that one field extending KphpJobWorkerSharedMemoryPiece
.
Using startMulti()
, it's possible to launch just an array of jobs, without any common piece. Then it's equal to call start()
in a loop. But not vice versa: to share a common piece, always use startMulti()
.
NoReply-jobs can't have common pieces, they don't have multi launch.
When the KPHP server starts, it launches
KPHP serves http and rpc (for kphp-as-microservice). User requests are usually served by nginx which proxies them to KPHP backend into a free general worker. If all workers are busy, a request waits in a system epoll queue.
When a request for a job comes from a neighbour worker, request data is copied into shared memory, and a signal is sent via a system pipe, that a job has been requested. All free job workers listen to this pipe, the random first gets a task. If all job workers are busy, a task waits in a queue (in a pipe).
How many workers are created exactly? It depends on the options:
-f N
is a total number of workers (positive int)--job-workers-ratio ratio
specifies a part of jobs (float from 0 to 1)A job worker is the same as a generic worker. It consumes the same amount of memory, it has the same lifecycle and init phase, the same PHP code is executed within it. Literally the same: PHP code itself should provide the logic “if I am a job, then fetch and handle”.
JobLauncher::start()
doesn't create a new process: it just pushes a task into a queue via a system pipe. A pool of workers is created at KPHP server startup.
If 4 job workers were started, and all of them are free at the moment, and PHP code launches 10 jobs — only 4 will start running simultaneously, other 6 will wait. They will be launched gradually, as long as previous are finishing.
If 24 job workers were started, and 20 of them are busy, and PHP code launches 10 jobs — similarly, 4 will start running, other 6 will be taken by those process which would end executing a current script.
JobLauncher::start()
takes $timeout
. It's a timeout for (wait in a queue + execute). If $timeout = 3.0, and a job waited 2 seconds in a queue, it has a second for execution. If it waits in a queue for more than 3 seconds, it even doesn't start running, and wait()
in a caller process returns KphpJobWorkerResponseError
with a corresponding error code.
If a jobs queue is constantly increasing, it means, that the amount of job workers is not enough, i.e. PHP code tries to create more jobs that can actually be processed. You can watch corresponding KPHP metrics using grafana. A solution is either to launch less amount of jobs, or to increase jobs ratio, or to increase a total number of workers.
Job workers are shared across all general workers. When your settings are balanced, and KPHP works correctly in production, and you publish a new code creating some jobs, your current ratio might become disbalanced for a new load profile. That's why it's bad practice to start jobs without providing toggles in PHP code that disable them in production. Usually, the more work we do in jobs, the less work is left for general workers, so the ratio should be increased a bit, but it's a point of heuristics for every custom situation.
In production, you'd probably have servers with varying technical characteristics. For high-capacity machines, with lots of memory, 10 jobs would be really run in parallel, but for others, half of them will be queued. When you split data in chunks, probably it's a good idea not to launch a constant X number of jobs, but vary X depending on JobLauncher::getJobWorkersNumber()
.
On KPHP server start, along with process and pipes, a huge piece of shared memory is allocated. General workers write requests into it, job workers read from it, job workers write responses, general workers read responses.
JobLauncher::start($request, $timeout)
does the following in detail:
“Deep copy” from a script memory into shared memory doesn't allocate anything, actually. Just like workers, memory is allocated once on KPHP server start.
Shared memory is divided into pieces. The basic are of 512 KBytes size (messages, in implementation terminology). Amount of available messages is either --job-workers-shared-messages
or 2 * N
, where N is a total number of workers (-f
option).
As you can see in the image above, a message is acquired to send a request / a response. If all messages are busy, a job can't be sent (memory limit exceeded). That's why a maximum number of jobs mostly depends on amount of shared memory pieces, not on jobs pipe size.
If to launch lots of jobs without calling wait()
, all pieces will become busy, and no more jobs could be launched. As jobs are shared across all workers, and if some bad PHP code launches lots of jobs, it will break jobs launching for others.
If a response is just 8 bytes, a message of 512 KBytes is acquired nevertheless.
To be able to send data more than 512 KBytes, there exist some bigger pieces, also preallocated: N/2 pieces per 1 MB, N/4 per 2 MB, N/8 per 4 MB, N/16 per 8 MB, N/32 per 16 MB, N/64 per 32 MB, N/128 per 64 MB. For small N (while development or on low-capacity machines) pieces of 16 MB and bigger may not exist.
If a request/response doesn't fit 512 KBytes, a message of 1 MB is attached, and if it doesn't fit again, KPHP uses 2 MB more, and so on. As the number of bigger pieces is less and less, it means, that if launching 8 jobs having 512K input works, but having 1M may have not succeed.
Now we see, what overhead is made up of:
Copying data is not just “memcpy” of course, it's recreation of structures and hash tables from scratch.
Having giant or complex data (deep hashtables, for example) may take an excessive amount of time, e.g. 2-3 ms.
But if amount of data is small (try to target this case), copying takes noticably less than handleRequest().
handleKphpJobWorkerRequest()
as is.Jobs are NOT about saving resources. Jobs are about responding to the end user as quickly as possible, though we require more resources. It’s additional memory and deep copying. PHP code with jobs consumes more CPU and memory. But the goal is to respond more quickly, it’s more important from the business point of view.