Multi-Party Job&Task Scheduling¶
1. Description¶
Mainly describes how to submit a federated learning job using FATE Flow
and observe the use of
2. Job submission¶
- Build a federated learning job and submit it to the scheduling system for execution
- Two configuration files are required: job dsl and job conf
- job dsl configures the running components: list, input-output relationships
- job conf configures the component execution parameters, system operation parameters
Build a federated learning job with two configuration files: job dsl and job conf, and submit it to the scheduler for execution
flow job submit [options]
Options
parameter name | required | type | description |
---|---|---|---|
-d, --dsl-path | yes | string | path to job dsl |
-c, --conf-path | yes | string | job conf's path |
Returns
parameter name | type | description |
---|---|---|
retcode | int | return code |
retmsg | string | return message |
jobId | string | Job ID |
data | dict | return data |
data.dsl_path | string | The path to the actual running dsl configuration generated by the system based on the submitted dsl content |
data.runtime_conf_on_party_path | string | The system-generated path to the actual running conf configuration for each party based on the submitted conf content |
data.dsl_path | string | The system-generated path to the actual running conf configuration for each party based on the submitted conf content |
data.board_url | string | fateboard view address |
data.model_info | dict | Model identification information |
Example
{
"data": {
"board_url": "http://127.0.0.1:8080/index.html#/dashboard?job_id=202111061608424372620&role=guest&party_id=9999",
"code": 0,
"dsl_path": "$FATE_PROJECT_BASE/jobs/202111061608424372620/job_dsl.json",
"job_id": "202111061608424372620",
"logs_directory": "$FATE_PROJECT_BASE/logs/202111061608424372620",
"message": "success",
"model_info": {
"model_id": "arbiter-10000#guest-9999#host-10000#model",
"model_version": "202111061608424372620"
},
"pipeline_dsl_path": "$FATE_PROJECT_BASE/jobs/202111061608424372620/pipeline_dsl.json",
"runtime_conf_on_party_path": "$FATE_FATE_PROJECT_BASE/jobs/202111061608424372620/guest/9999/job_runtime_on_party_conf.json",
"runtime_conf_path":"$FATE_PROJECT_BASE/jobs/202111061608424372620/job_runtime_conf.json",
"train_runtime_conf_path": "$FATE_PROJECT_BASE/jobs/202111061608424372620/train_runtime_conf.json"
},
"jobId": "202111061608424372620",
"retcode": 0,
"retmsg": "success"
}
3. Job DSL configuration description¶
The configuration file of DSL is in json format, in fact, the whole configuration file is a json object (dict).
3.1 Component List¶
Description The first level of this dict is components
, which indicates the modules that will be used by this job.
Example
{
"components" : {
...
}
}
Each individual module is defined under "components", e.g.
"data_transform_0": {
"module": "DataTransform",
"input": {
"data": {
"data": [
"reader_0.train_data"
]
}
},
"output": {
"data": ["train"],
"model": ["model"]
}
}
All data needs to be fetched from the data store via the Reader module, note that this module only has the output output
"reader_0": {
"module": "Reader",
"output": {
"data": ["train"]
}
}
3.2 Modules¶
Description Used to specify the components to be used, all optional module names refer to. Example
"hetero_feature_binning_1": {
"module": "HeteroFeatureBinning",
...
}
3.3 Inputs¶
Implications Upstream inputs, divided into two input types, data and model.
data input¶
Description Upstream data input, divided into three input types.
> 1. data: generally used in the data-transform module, feature_engineering module or
> evaluation module.
> 2. train_data: Generally used in homo_lr, hetero_lr and secure_boost
> modules. If the train_data field is present, then the task will be recognized as a fit task
> validate_data: If the train_data
> field is present, then the field is optional. If you choose to keep this field, the data pointed to will be used as the
> validation set
> 4. test_data: Used as prediction data, if provided, along with model input.
model_input¶
Description Upstream model input, divided into two input types. 1. model: Used for model input of the same type of component. For example, hetero_binning_0 will fit the model, and then hetero_binning_1 will use the output of hetero_binning_0 for predict or transform. code example.
"hetero_feature_binning_1": {
"module": "HeteroFeatureBinning",
"input": {
"data": {
"data": [
"data_transform_1.validate_data"
]
},
"model": [
"hetero_feature_binning_0.fit_model"
]
},
"output": {
"data": ["validate_data" ],
"model": ["eval_model"]
}
}
"hetero_feature_selection_0": {
"module": "HeteroFeatureSelection",
"input": {
"data": {
"data": [
"hetero_feature_binning_0.train"
]
},
"isometric_model": [
"hetero_feature_binning_0.output_model"
]
},
"output": {
"data": [ "train" ],
"model": ["output_model"]
}
}
3.4 Output¶
Description Output, like input, is divided into data and model output
data output¶
Description Data output, divided into four output types.
- data: General module data output
- train_data: only for Data Split
- validate_data: Only for Data Split
- test_data: Data Split only
Model Output¶
Description Model output, using model only
3.5 Component Providers¶
Since FATE-Flow version 1.7.0, the same FATE-Flow system supports loading multiple component providers, i.e. providers, which provide several components, and the source provider of the component can be configured when submitting a job Since FATE-Flow version 1.9.0, the parameters of the provider need to be configured in job conf, as follows
Description Specify the provider, support global specification and individual component specification; if not specified, the default provider: fate@$FATE_VERSION
Format provider_name@$provider_version
Advanced You can register a new provider through the component registration CLI, currently supported providers: fate and fate_sql, please refer to FATE Flow Component Center
Example
{
"dsl_version": "2",
"initiator": {},
"role": {},
"job_parameters": {},
"component_parameters": {},
"provider": {
"common": {
"hetero_feature_binning_0": "fate@1.8.0"
},
"role": {
"guest": {
"0": {
"data_transform_0": "fate@1.9.0"
}
},
"host": {
"0": {
"data_transform_0": "fate@1.9.0"
}
}
}
}
}
4. Job Conf Configuration Description¶
Job Conf is used to set the information of each participant, the parameters of the job and the parameters of each component. The contents include the following.
4.1 DSL Version¶
Description Configure the version, the default is not 1, it is recommended to configure 2 Example
"dsl_version": "2"
4.2 Job participants¶
initiating party¶
Description The role and party_id of the assignment initiator. Example
"initiator": {
"role": "guest",
"party_id": 9999
}
All participants¶
Description Information about each participant. Description In the role field, each element represents a role and the party_id that assumes that role. party_id for each role The party_id of each role is in the form of a list, since a task may involve multiple parties in the same role. Example
"role": {
"guest": [9999],
"host": [10000],
"arbiter": [10000]
}
4.3 System operation parameters¶
Description Configure the main system parameters for job runtime
Parameter application scope policy setting¶
**Apply to all participants, use the common scope identifier **Apply to only one participant, use the role scope identifier, use (role:)party_index to locate the specified participant, direct
"common": {
}
"role": {
"guest": {
"0": {
}
}
}
The parameters under common are applied to all participants, and the parameters under role-guest-0 configuration are applied to the participants under the subscript 0 of the guest role. Note that the current version of the system operation parameters are not strictly tested for application to only one participant, so it is recommended to use common as a preference.
Supported system parameters¶
Configuration | Default | Supported | Description |
---|---|---|---|
job_type | train | train, predict | task_cores |
task_cores | 4 | positive_integer | total_cpu_cores_applied_to_job |
task_parallelism | 1 | positive_integer | task_parallelism |
computing_partitions | number of cpu cores allocated to task | positive integer | number of partitions in the data table at computation time |
eggroll_run | none | processors_per_node, etc. | eggroll computing engine related configuration parameters, generally do not need to be configured, from task_cores automatically calculated, if configured, task_cores parameters do not take effect |
spark_run | none | num-executors, executor-cores, etc. | spark compute engine related configuration parameters, generally do not need to be configured, automatically calculated by task_cores, if configured, task_cores parameters do not take effect |
rabbitmq_run | None | queue, exchange, etc. | Configuration parameters for rabbitmq to create queue, exchange, etc., which are generally not required and take the system defaults. |
pulsar_run | none | producer, consumer, etc. | The configuration parameters for pulsar to create producer and consumer. |
federated_status_collect_type | PUSH | PUSH, PULL | Multi-party run status collection mode, PUSH means that each participant actively reports to the initiator, PULL means that the initiator periodically pulls from each participant. |
timeout | 259200 (3 days) | positive integer | task_timeout,unit_second |
audo_retries | 3 | positive integer | maximum number of retries per task failure |
model_id | - | - | The model id to be filled in for prediction tasks. |
model_version | - | - | Model version, required for prediction tasks |
- there is a certain support dependency between the computation engine and the storage engine
- developers can implement their own adapted engines, and configure the engines in runtime config
reference configuration¶
- no need to pay attention to the compute engine, take the system default cpu allocation compute policy when the configuration
"job_parameters": {
"common": {
"job_type": "train",
"task_cores": 6,
"task_parallelism": 2,
"computing_partitions": 8,
"timeout": 36000
}
}
- use eggroll as the computing engine, take the configuration when specifying cpu and other parameters directly
"job_parameters": {
"common": {
"job_type": "train",
"eggroll_run": {
"eggroll.session.processors.per.node": 2
},
"task_parallelism": 2,
"computing_partitions": 8,
"timeout": 36000,
}
}
- use spark as the computing engine, rabbitmq as the federation engine, take the configuration when specifying the cpu and other parameters directly
"job_parameters": {
"common": {
"job_type": "train",
"spark_run": {
"num-executors": 1,
"executor-cores": 2
},
"task_parallelism": 2,
"computing_partitions": 8,
"timeout": 36000,
"rabbitmq_run": {
"queue": {
"durable": true
},
"connection": {
"heartbeat": 10000
}
}
}
}
- use spark as the computing engine and pulsar as the federation engine
"job_parameters": {
"common": {
"spark_run": {
"num-executors": 1,
"executor-cores": 2
},
}
}
4.3 Component operation parameters¶
Parameter application scope policy setting¶
- Apply to all participants, use common scope identifier
- Apply to only one participant, use the role scope identifier, use (role:)party_index to locate the specified participant, directly specified parameters have higher priority than common parameters
"commom": {
}
"role": {
"guest": {
"0": {
}
}
}
where the parameters under the common configuration are applied to all participants, and the parameters under the role-guest-0 configuration indicate that they are applied to the participants under the subscript 0 of the guest role Note that the current version of the component runtime parameter already supports two application scope policies
Reference Configuration¶
- For the
intersection_0
andhetero_lr_0
components, the runtime parameters are placed under the common scope and are applied to all participants - The operational parameters of
reader_0
anddata_transform_0
components are configured specific to each participant, because usually the input parameters are not consistent across participants, so usually these two components are set by participant - The above component names are defined in the DSL configuration file
"component_parameters": {
"common": {
"intersection_0": {
"intersect_method": "raw",
"sync_intersect_ids": true,
"only_output_key": false
},
"hetero_lr_0": {
"penalty": "L2",
"optimizer": "rmsprop",
"alpha": 0.01,
"max_iter": 3,
"batch_size": 320,
"learning_rate": 0.15,
"init_param": {
"init_method": "random_uniform"
}
}
},
"role": {
"guest": {
"0": {
"reader_0": {
"table": {"name": "breast_hetero_guest", "namespace": "experiment"}
},
"data_transform_0":{
"with_label": true,
"label_name": "y",
"label_type": "int",
"output_format": "dense"
}
}
},
"host": {
"0": {
"reader_0": {
"table": {"name": "breast_hetero_host", "namespace": "experiment"}
},
"data_transform_0":{
"with_label": false,
"output_format": "dense"
}
}
}
}
}
5. Multi-Host Configuration¶
Multi-Host task should list all host information under role
Example:
"role": {
"guest": [
10000
],
"host": [
10000, 10001, 10002
],
"arbiter": [
10000
]
}
The different configurations for each host should be listed separately under their respective corresponding modules
Example:
"component_parameters": {
"role": {
"host": {
"0": {
"reader_0": {
"table":
{
"name": "hetero_breast_host_0",
"namespace": "hetero_breast_host"
}
}
},
"1": {
"reader_0": {
"table":
{
"name": "hetero_breast_host_1",
"namespace": "hetero_breast_host"
}
}
},
"2": {
"reader_0": {
"table":
{
"name": "hetero_breast_host_2",
"namespace": "hetero_breast_host"
}
}
}
}
}
}
6. Predictive Task Configuration¶
6.1 Description¶
DSL V2 does not automatically generate prediction dsl for the training task. Users need to deploy the modules in the required model using Flow Client
first.
For detailed command description, please refer to fate_flow_client
flow model deploy --model-id $model_id --model-version $model_version --cpn-list ...
Optionally, the user can add new modules to the prediction dsl, such as Evaluation
6.2 Sample¶
Training dsl.
"components": {
"reader_0": {
"module": "Reader",
"output": {
"data": [
"data"
]
}
},
"data_transform_0": {
"module": "DataTransform",
"input": {
"data": {
"data": [
"reader_0.data"
]
}
},
"output": {
"data": [
"data"
],
"model": [
"model"
]
}
},
"intersection_0": {
"module": "Intersection",
"input": {
"data": {
"data": [
"data_transform_0.data"
]
}
},
"output": {
"data":[
"data"
]
}
},
"hetero_nn_0": {
"module": "HeteroNN",
"input": {
"data": {
"train_data": [
"intersection_0.data"
]
}
},
"output": {
"data": [
"data"
],
"model": [
"model"
]
}
}
}
Prediction dsl:
"components": {
"reader_0": {
"module": "Reader",
"output": {
"data": [
"data"
]
}
},
"data_transform_0": {
"module": "DataTransform",
"input": {
"data": {
"data": [
"reader_0.data"
]
}
},
"output": {
"data": [
"data"
],
"model": [
"model"
]
}
},
"intersection_0": {
"module": "Intersection",
"input": {
"data": {
"data": [
"data_transform_0.data"
]
}
},
"output": {
"data":[
"data"
]
}
},
"hetero_nn_0": {
"module": "HeteroNN",
"input": {
"data": {
"train_data": [
"intersection_0.data"
]
}
},
"output": {
"data": [
"data"
],
"model": [
"model"
]
}
},
"evaluation_0": {
"module": "Evaluation",
"input": {
"data": {
"data": [
"hetero_nn_0.data"
]
}
},
"output": {
"data": [
"data"
]
}
}
}
7. Job reruns¶
In 1.5.0
, we started to support re-running a job, but only failed jobs are supported.
Version 1.7.0
supports rerunning of successful jobs, and you can specify which component to rerun from, the specified component and its downstream components will be rerun, but other components will not be rerun
Rerun a job
flow job rerun [options]
Options
| parameter name | required | type | description | | :------------- | :------- | :--- | ----------- |------- | | -j, --job-id | yes | string | job id path | | --cpn, --component-name | no | string | Specifies which component to rerun from, unspecified components will not be executed if they have no upstream dependencies on the specified component; if not specified, the entire job will be rerun | | --force | no | bool | The job will be rerun even if it succeeds; if not specified, the job will be skipped if it succeeds |
Returns
parameter name | type | description |
---|---|---|
retcode | int | return code |
retmsg | string | return message |
jobId | string | Job ID |
data | dict | return data |
Example
flow job rerun -j 202111031100369723120
flow job rerun -j 202111031100369723120 -cpn hetero_lr_0
flow job rerun -j 202111031100369723120 -cpn hetero_lr_0 --force
8. Job parameter update¶
In the actual production modeling process, it is necessary to constantly debug the component parameters and rerun, but not all components need to be adjusted and rerun at this time, so after 1.7.0
version support to modify a component parameter update, and with the rerun
command on-demand rerun
Update the job parameters
flow job parameter-update [options]
Options
parameter-name | required | type | description |
---|---|---|---|
-j, --job-id | yes | string | job id path |
-c, --conf-path | yes | string | The contents of the job conf that needs to be updated, no need to fill in parameters that don't need to be updated |
Returns
parameter name | type | description |
---|---|---|
retcode | int | return code |
retmsg | string | return message |
jobId | string | Job ID |
data | dict | Returns the updated job conf |
Example
Assuming that the job is updated with some of the execution parameters of the hetero_lr_0 component, the configuration file is as follows.
{
"job_parameters": {
},
"component_parameters": {
"common": {
"hetero_lr_0": {
"alpha": 0.02,
"max_iter": 5
}
}
}
}
Execution of the following command takes effect.
flow job parameter-update -j 202111061957421943730 -c examples/other/update_parameters.json
Execute the following command to rerun.
flow job rerun -j 202111061957421943730 -cpn hetero_lr_0 --force
9. Job scheduling policy¶
- Queuing by commit time
- Currently, only FIFO policy is supported, i.e. the scheduler will only scan the first job each time, if the first job is successful in requesting resources, it will start and get out of the queue, if the request fails, it will wait for the next round of scheduling.
10. dependency distribution¶
Brief description:
- Support for distributing fate and python dependencies from client nodes;
- The work node does not need to deploy fate;
- Only fate on spark supports distribution mode in current version;
Related parameters configuration:
conf/service_conf.yaml:
dependent_distribution: true
fate_flow/settings.py
FATE_FLOW_UPDATE_CHECK = False
Description:
-
dependent_distribution: dependent distribution switch;, off by default; when off, you need to deploy fate on each work node, and also fill in the configuration of spark in spark-env.sh to configure PYSPARK_DRIVER_PYTHON and PYSPARK_PYTHON.
-
FATE_FLOW_UPDATE_CHECK: Dependency check switch, turned off by default; it will automatically check if the fate code has changed every time a task is submitted; if it has changed, the fate code dependency will be re-uploaded;