src.environment.parallelenv.parallelenv

Module Contents

Classes

API

class src.environment.parallelenv.parallelenv.ParallelEnv(envs: typing.List[gym.Env], para_mode: typing.Literal[dummy, subproc, ray, ray-subproc] = 'dummy', asynchronous: typing.Literal[None, idle, restart, continue] = None, num_cpus: typing.Optional[typing.Union[int, None]] = None, num_gpus: int = 0, no_warning=True)[source]

Initialization

An integrated parallel Environment.

Parameters:
  • envs – The list of the GYM style Envs to be processed in parallel.

  • para_mode

    The mode for parallel, can be:

    • dummy (sequential processing)

    • subproc (multi-processing parallel)

    • ray (parallel with Ray)

    • ray-subproc (hybrid parallel which uses subproc envs under each ray worker)

  • asynchronous

    Whether to use asynchronous processing for sub envs with different life length.

    • None means terminating all sub envs when any one of them is done, after that any actions will get None state, None reward, True done and empty info;

    • idle means the living envs will return step results normally while the results of the done sub envs will be replaced with None results above;

    • restart means when an env is done, it will be reset immediately, its returned state will be the first state after reset and the last state before reset can be found in info[‘ended_state’] and the flag info[‘reset’] is True;

    • continue means all envs will reveive actions and return results normally even they are done, the processing logits are determined by users.

  • num_cpus – The number of cpu cores assigned for this parallel environment, default to be all cores.

  • no_warning – Whether to show warnings from the envs, default to True (not show warnings).

__VectorEnvOption[source]

None

__len__()[source]
get_env_attr(key: str, id: Optional[Union[int, List[int], numpy.ndarray]] = None)[source]

get a attribute value for all or spercific (through id) envs, if the attribute doesn’t exist in the env(s), it will return None(s)

set_env_attr(key: str, value: Any, id: Optional[Union[int, List[int], numpy.ndarray]] = None)[source]

set the value of a attribute in all or spercific (through id) envs

has_done()[source]

if there is an env done

all_done()[source]

if all envs done

customized_method(env_method: str, data=None, id: Optional[Union[int, List[int], numpy.ndarray]] = None)[source]

if user declares a method in the env named [env_method] and requiring arguments in a dictionary [data], this method can call the function in parallel.

# For instance, to run a ``func`` method on 8 of a batch of 16 envs which requires
# an argument named ``x``, then construct a list of 8 argument dictionaries:
data = [{'x': ...}, {...}, ...]
# and the id of the envs to run the func:
id = [0, 1, 2, ...]
# call the customized_method
results = VectorEnv.customized_method("func", data, id)
reset(id: Optional[Union[int, List[int], numpy.ndarray]] = None)[source]

reset the envs with index in [id], default to reset all envs

seed(seed: Optional[Union[int, List[int], numpy.ndarray]])[source]

set the seed for all envs

step(action, id: Optional[Union[int, List[int], numpy.ndarray]] = None, align: Literal[batch, item] = 'item')[source]

take a step in envs.

Parameters:
  • action – the actions to be take.

  • id – the index of the envs to take the action, corresponding one-to-one with action, default to take steps in all envs.

  • align – the alignment mode of the output, batch means output as a batch of 4-item tuples [<state, reward, done, info>, <…>, …], item means output as 4 batched data <states, rewards, dones, infos>

__process_align(obs_list, rew_list, done_list, info_list, align: Literal[batch, item] = 'item')[source]
__update_done(done_list)[source]
rollout()[source]
close()[source]

close the envs to avoid memory or process leak