Source code for easy_rec.python.utils.test_utils

# -*- encoding:utf-8 -*-
# Copyright (c) Alibaba, Inc. and its affiliates.
"""Contains functions which are convenient for unit testing.

isort:skip_file
"""
from future import standard_library
standard_library.install_aliases()
import yaml
import glob
import json
import logging
import os
import random
import shutil
import string
import subprocess
import time
import six
from multiprocessing import Process
from subprocess import getstatusoutput
from tensorflow.python.platform import gfile
import numpy as np
from easy_rec.python.protos.train_pb2 import DistributionStrategy
from easy_rec.python.utils import config_util
from easy_rec.python.protos.pipeline_pb2 import EasyRecConfig
from easy_rec.python.utils.io_util import read_data_from_json_path
from easy_rec.python.utils import constant

TEST_DIR = './tmp/easy_rec_test'

# parallel run of tests could take more time
TEST_TIME_OUT = int(os.environ.get('TEST_TIME_OUT', 1800))


[docs]def get_hdfs_tmp_dir(test_dir): """Create a randomly of directory in HDFS.""" tmp_name = ''.join( [random.choice(string.ascii_letters + string.digits) for i in range(8)]) assert isinstance(test_dir, str) test_rand_dir = os.path.join(test_dir, tmp_name) gfile.MkDir(test_rand_dir) return test_rand_dir
[docs]def proc_wait(proc, timeout=1200): t0 = time.time() while proc.poll() is None and time.time() - t0 < timeout: time.sleep(1) if proc.poll() is None: logging.warning('proc[pid=%d] timeout[%d], will kill the proc' % (proc.pid, timeout)) proc.terminate() while proc.poll() is None: time.sleep(1)
[docs]def get_tmp_dir(): max_retry = 5 while max_retry > 0: tmp_name = ''.join([ random.choice(string.ascii_letters + string.digits) for i in range(12) ]) if os.environ.get('TEST_DIR', '') != '': global TEST_DIR TEST_DIR = os.environ['TEST_DIR'] dir_name = os.path.join(TEST_DIR, tmp_name) if not os.path.exists(dir_name): os.makedirs(dir_name) return dir_name else: max_retry -= 1 raise RuntimeError('Failed to get_tmp_dir: max_retry=%d' % max_retry)
[docs]def clear_all_tmp_dirs(): shutil.rmtree(TEST_DIR)
[docs]def set_gpu_id(gpu_id_str): env = os.environ if gpu_id_str is None: env['CUDA_VISIBLE_DEVICES'] = '' else: env['CUDA_VISIBLE_DEVICES'] = gpu_id_str
[docs]def get_available_gpus(): if 'TEST_DEVICES' in os.environ: gpus = os.environ['TEST_DEVICES'].split(',') else: gpus = glob.glob('/dev/nvidia[0-9]*') gpus = [gpu.replace('/dev/nvidia', '') for gpu in gpus] logging.info('available gpus %s' % gpus) return gpus
[docs]def run_cmd(cmd_str, log_file, env=None): """Run a shell cmd.""" cmd_str = cmd_str.replace('\r', ' ').replace('\n', ' ') logging.info('RUNCMD: %s > %s 2>&1 ' % (cmd_str, log_file)) with open(log_file, 'w') as lfile: proc = subprocess.Popen( cmd_str, stdout=lfile, stderr=subprocess.STDOUT, shell=True, env=env) if six.PY2: # for debug purpose proc.args = cmd_str return proc
[docs]def RunAsSubprocess(f): """Function dectorator to run function in subprocess. if a function will start a tf session. Because tensorflow gpu memory will not be cleared until the process exit. """ def wrapped_f(*args, **kw): p = Process(target=f, args=args, kwargs=kw) p.start() p.join(timeout=600) assert p.exitcode == 0, 'subprocess run failed: %s' % f.__name__ return wrapped_f
[docs]def clean_up(test_dir): if test_dir is not None: shutil.rmtree(test_dir) # reset to cpu mode set_gpu_id(None)
[docs]def clean_up_hdfs(test_dir): if gfile.Exists(test_dir): gfile.DeleteRecursively(test_dir) set_gpu_id(None)
def _replace_data_for_test(data_path): """Replace real data with test data.""" test_data = {} change = False releated_datasets = [] for k, config in test_data.items(): if k in data_path: releated_datasets.append(k) # if there are multiple keyword detected, use the longest one if len(releated_datasets) > 0: score = [len(k) for k in releated_datasets] best_match = np.argmax(score) data_path = test_data[releated_datasets[best_match]] change = True assert change, 'Failed to replace data with test data' return data_path def _load_config_for_test(pipeline_config_path, test_dir, total_steps=50, num_epochs=0): pipeline_config = config_util.get_configs_from_pipeline_file( pipeline_config_path) train_config = pipeline_config.train_config eval_config = pipeline_config.eval_config data_config = pipeline_config.data_config train_config.num_steps = total_steps # change model_dir pipeline_config.model_dir = os.path.join(test_dir, 'train') logging.info('test_model_dir %s' % pipeline_config.model_dir) eval_config.num_examples = max(10, data_config.batch_size) data_config.num_epochs = num_epochs return pipeline_config def _load_config_for_distribute_eval(pipeline_config_path, test_dir): pipeline_config = config_util.get_configs_from_pipeline_file( pipeline_config_path) pipeline_config.model_dir = test_dir logging.info('test_model_dir %s' % pipeline_config.model_dir) return pipeline_config
[docs]def test_datahub_train_eval(pipeline_config_path, odps_oss_config, test_dir, process_pipeline_func=None, total_steps=50, post_check_func=None): gpus = get_available_gpus() if len(gpus) > 0: set_gpu_id(gpus[0]) else: set_gpu_id(None) if not isinstance(pipeline_config_path, EasyRecConfig): logging.info('testing pipeline config %s' % pipeline_config_path) if 'TF_CONFIG' in os.environ: del os.environ['TF_CONFIG'] if isinstance(pipeline_config_path, EasyRecConfig): pipeline_config = pipeline_config_path else: pipeline_config = _load_config_for_test(pipeline_config_path, test_dir, total_steps) pipeline_config.train_config.train_distribute = 0 pipeline_config.train_config.num_gpus_per_worker = 1 pipeline_config.train_config.sync_replicas = False pipeline_config.datahub_train_input.akId = odps_oss_config.dh_id pipeline_config.datahub_train_input.akSecret = odps_oss_config.dh_key pipeline_config.datahub_train_input.region = odps_oss_config.dh_endpoint pipeline_config.datahub_train_input.project = odps_oss_config.dh_project pipeline_config.datahub_train_input.topic = odps_oss_config.dh_topic pipeline_config.datahub_eval_input.akId = odps_oss_config.dh_id pipeline_config.datahub_eval_input.akSecret = odps_oss_config.dh_key pipeline_config.datahub_eval_input.region = odps_oss_config.dh_endpoint pipeline_config.datahub_eval_input.project = odps_oss_config.dh_project pipeline_config.datahub_eval_input.topic = odps_oss_config.dh_topic if process_pipeline_func is not None: assert callable(process_pipeline_func) pipeline_config = process_pipeline_func(pipeline_config) config_util.save_pipeline_config(pipeline_config, test_dir) test_pipeline_config_path = os.path.join(test_dir, 'pipeline.config') train_cmd = 'python -m easy_rec.python.train_eval --pipeline_config_path %s' % \ test_pipeline_config_path proc = run_cmd(train_cmd, '%s/log_%s.txt' % (test_dir, 'master')) proc_wait(proc, timeout=TEST_TIME_OUT) if proc.returncode != 0: logging.warning( 'train %s failed[pid=%d][code=%d][args=%s]' % (test_pipeline_config_path, proc.pid, proc.returncode, proc.args)) return False if post_check_func: return post_check_func(pipeline_config) return True
def _Load_config_for_test_eval(pipeline_config_path): pipeline_config = config_util.get_configs_from_pipeline_file( pipeline_config_path) return pipeline_config
[docs]def test_single_train_eval(pipeline_config_path, test_dir, process_pipeline_func=None, hyperparam_str='', total_steps=50, post_check_func=None, check_mode=False, fine_tune_checkpoint=None, extra_cmd_args=None, timeout=-1): gpus = get_available_gpus() if len(gpus) > 0: set_gpu_id(gpus[0]) else: set_gpu_id(None) if not isinstance(pipeline_config_path, EasyRecConfig): logging.info('testing pipeline config %s' % pipeline_config_path) if 'TF_CONFIG' in os.environ: del os.environ['TF_CONFIG'] if isinstance(pipeline_config_path, EasyRecConfig): pipeline_config = pipeline_config_path else: pipeline_config = _load_config_for_test(pipeline_config_path, test_dir, total_steps) pipeline_config.train_config.train_distribute = 0 pipeline_config.train_config.num_gpus_per_worker = 1 pipeline_config.train_config.sync_replicas = False if process_pipeline_func is not None: assert callable(process_pipeline_func) pipeline_config = process_pipeline_func(pipeline_config) config_util.save_pipeline_config(pipeline_config, test_dir) test_pipeline_config_path = os.path.join(test_dir, 'pipeline.config') train_cmd = 'python -m easy_rec.python.train_eval --pipeline_config_path=' + test_pipeline_config_path if hyperparam_str: train_cmd += ' --edit_config_json=\'%s\'' % hyperparam_str if fine_tune_checkpoint: train_cmd += ' --fine_tune_checkpoint %s' % fine_tune_checkpoint if check_mode: train_cmd += ' --check_mode' if extra_cmd_args: train_cmd += ' ' train_cmd += extra_cmd_args proc = run_cmd(train_cmd, '%s/log_%s.txt' % (test_dir, 'master')) proc_wait(proc, timeout=TEST_TIME_OUT if timeout < 0 else timeout) if proc.returncode != 0: logging.error('train %s failed' % test_pipeline_config_path) return False if post_check_func: return post_check_func(pipeline_config) return True
[docs]def test_single_pre_check(pipeline_config_path, test_dir): gpus = get_available_gpus() if len(gpus) > 0: set_gpu_id(gpus[0]) else: set_gpu_id(None) if not isinstance(pipeline_config_path, EasyRecConfig): logging.info('testing pipeline config %s' % pipeline_config_path) if 'TF_CONFIG' in os.environ: del os.environ['TF_CONFIG'] if isinstance(pipeline_config_path, EasyRecConfig): pipeline_config = pipeline_config_path else: pipeline_config = _load_config_for_test(pipeline_config_path, test_dir) pipeline_config.train_config.train_distribute = 0 pipeline_config.train_config.num_gpus_per_worker = 1 pipeline_config.train_config.sync_replicas = False config_util.save_pipeline_config(pipeline_config, test_dir) test_pipeline_config_path = os.path.join(test_dir, 'pipeline.config') train_cmd = 'python -m easy_rec.python.tools.pre_check --pipeline_config_path %s ' % ( test_pipeline_config_path) proc = run_cmd(train_cmd, '%s/log_%s.txt' % (test_dir, 'master')) proc_wait(proc, timeout=TEST_TIME_OUT) if proc.returncode != 0: logging.error('train %s failed' % test_pipeline_config_path) return False return True
[docs]def test_single_predict(test_dir, input_path, output_path, saved_model_dir): gpus = get_available_gpus() if len(gpus) > 0: set_gpu_id(gpus[0]) else: set_gpu_id(None) predict_cmd = 'python -m easy_rec.python.predict --input_path %s --output_path %s --saved_model_dir %s' % ( input_path, output_path, saved_model_dir) proc = run_cmd(predict_cmd, '%s/log_%s.txt' % (test_dir, 'master')) proc_wait(proc, timeout=TEST_TIME_OUT) if proc.returncode != 0: logging.error('predict failed') return False return True
[docs]def test_feature_selection(pipeline_config): model_dir = pipeline_config.model_dir pipeline_config_path = os.path.join(model_dir, 'pipeline.config') output_dir = os.path.join(model_dir, 'feature_selection') cmd = 'python -m easy_rec.python.tools.feature_selection --config_path %s ' \ '--output_dir %s --topk 5 --visualize true' % (pipeline_config_path, output_dir) proc = run_cmd(cmd, os.path.join(model_dir, 'log_feature_selection.txt')) proc_wait(proc, timeout=TEST_TIME_OUT) if proc.returncode != 0: logging.error('feature selection %s failed' % pipeline_config_path) return False return True
[docs]def yaml_replace(train_yaml_path, pipline_config_path, test_pipeline_config_path, test_export_dir=None): with open(train_yaml_path, 'r', encoding='utf-8') as _file: sample = _file.read() x = yaml.load(sample) _command = x['app']['command'] if test_export_dir is not None: _command = _command.replace(pipline_config_path, test_pipeline_config_path).replace( '{EXPOERT_DIR}', test_export_dir) else: _command = _command.replace(pipline_config_path, test_pipeline_config_path) x['app']['command'] = _command with open(train_yaml_path, 'w', encoding='utf-8') as _file: yaml.dump(x, _file)
[docs]def test_hdfs_train_eval(pipeline_config_path, train_yaml_path, test_dir, process_pipeline_func=None, hyperparam_str='', total_steps=2000): gpus = get_available_gpus() if len(gpus) > 0: set_gpu_id(gpus[0]) else: set_gpu_id(None) logging.info('testing pipeline config %s' % pipeline_config_path) logging.info('train_yaml_path %s' % train_yaml_path) if 'TF_CONFIG' in os.environ: del os.environ['TF_CONFIG'] pipeline_config = _load_config_for_test(pipeline_config_path, test_dir, total_steps) logging.info('model_dir in pipeline_config has been modified') pipeline_config.train_config.train_distribute = 0 pipeline_config.train_config.num_gpus_per_worker = 1 pipeline_config.train_config.sync_replicas = False if process_pipeline_func is not None: assert callable(process_pipeline_func) pipeline_config = process_pipeline_func(pipeline_config) config_util.save_pipeline_config(pipeline_config, test_dir) test_pipeline_config_path = os.path.join(test_dir, 'pipeline.config') yaml_replace(train_yaml_path, pipeline_config_path, test_pipeline_config_path) logging.info('test_pipeline_config_path is %s' % test_pipeline_config_path) train_cmd = 'el_submit -yaml %s' % train_yaml_path proc = subprocess.Popen(train_cmd.split(), stderr=subprocess.STDOUT) proc_wait(proc, timeout=TEST_TIME_OUT) if proc.returncode != 0: logging.error('train %s failed' % test_pipeline_config_path) logging.error('train_yaml %s failed' % train_yaml_path) return proc.returncode == 0
[docs]def test_hdfs_eval(pipeline_config_path, eval_yaml_path, test_dir, process_pipeline_func=None, hyperparam_str=''): gpus = get_available_gpus() if len(gpus) > 0: set_gpu_id(gpus[0]) else: set_gpu_id(None) logging.info('testing export pipeline config %s' % pipeline_config_path) logging.info('eval_yaml_path %s' % eval_yaml_path) if 'TF_CONFIG' in os.environ: del os.environ['TF_CONFIG'] pipeline_config = _Load_config_for_test_eval(pipeline_config_path) if process_pipeline_func is not None: assert callable(process_pipeline_func) pipeline_config = process_pipeline_func(pipeline_config) config_util.save_pipeline_config(pipeline_config, test_dir) test_pipeline_config_path = os.path.join(test_dir, 'pipeline.config') yaml_replace(eval_yaml_path, pipeline_config_path, test_pipeline_config_path) logging.info('test_pipeline_config_path is %s' % test_pipeline_config_path) eval_cmd = 'el_submit -yaml %s' % eval_yaml_path proc = subprocess.Popen(eval_cmd.split(), stderr=subprocess.STDOUT) proc_wait(proc, timeout=TEST_TIME_OUT) if proc.returncode != 0: logging.error('eval %s failed' % test_pipeline_config_path) logging.error('eval_yaml %s failed' % eval_yaml_path) return proc.returncode == 0
[docs]def test_hdfs_export(pipeline_config_path, export_yaml_path, test_dir, process_pipeline_func=None, hyperparam_str=''): gpus = get_available_gpus() if len(gpus) > 0: set_gpu_id(gpus[0]) else: set_gpu_id(None) logging.info('testing export pipeline config %s' % pipeline_config_path) logging.info('export_yaml_path %s' % export_yaml_path) if 'TF_CONFIG' in os.environ: del os.environ['TF_CONFIG'] pipeline_config = _Load_config_for_test_eval(pipeline_config_path) if process_pipeline_func is not None: assert callable(process_pipeline_func) pipeline_config = process_pipeline_func(pipeline_config) config_util.save_pipeline_config(pipeline_config, test_dir) test_pipeline_config_path = os.path.join(test_dir, 'pipeline.config') test_export_path = os.path.join(test_dir, 'export_dir') yaml_replace(export_yaml_path, pipeline_config_path, test_pipeline_config_path, test_export_path) logging.info('test_pipeline_config_path is %s' % test_pipeline_config_path) eval_cmd = 'el_submit -yaml %s' % export_yaml_path proc = subprocess.Popen(eval_cmd.split(), stderr=subprocess.STDOUT) proc_wait(proc, timeout=TEST_TIME_OUT) if proc.returncode != 0: logging.error('export %s failed' % test_pipeline_config_path) logging.error('export_yaml %s failed' % export_yaml_path) return proc.returncode == 0
def _ports_in_use(ports): ports_str = '' for i, port in enumerate(ports): if i > 0: ports_str += '|' ports_str += '0.0.0.0:%d|127.0.0.1:%d' % (port, port) stat, output = getstatusoutput('netstat -tlnp | grep -E %s' % ports_str) return stat == 0
[docs]def get_ports_base(num_worker): port_base = int(os.environ.get('PORT_BASE', 10000)) num_try = 10 for i in range(num_try): ports = np.random.randint(port_base, port_base + 5000, size=num_worker) if not _ports_in_use(ports): return ports logging.info('ports %s in use, retry...' % ports)
def _get_ports(num_worker): # port queue to deals with port conflicts when multiple # test cases run in parallel if 'ports' in os.environ: ports = os.environ['ports'] port_arr = [int(x) for x in ports.split(',')] assert len(port_arr) >= num_worker, 'not enough ports: %s, required: %d'\ % (ports, num_worker) return port_arr[:num_worker] else: return get_ports_base(num_worker) def _ps_worker_train(pipeline_config_path, test_dir, num_worker, num_evaluator=0, fit_on_eval=False, fit_on_eval_steps=None): gpus = get_available_gpus() # not enough gpus, run on cpu only if len(gpus) < num_worker: gpus = [None] * num_worker ports = _get_ports(num_worker + 1) chief_or_master = 'master' if num_evaluator == 0 else 'chief' cluster = { chief_or_master: ['localhost:%d' % ports[0]], 'worker': ['localhost:%d' % ports[i] for i in range(1, num_worker)], 'ps': ['localhost:%d' % ports[-1]] } tf_config = {'cluster': cluster} procs = {} tf_config['task'] = {'type': chief_or_master, 'index': 0} os.environ['TF_CONFIG'] = json.dumps(tf_config) set_gpu_id(gpus[0]) train_cmd = 'python -m easy_rec.python.train_eval --pipeline_config_path %s' % pipeline_config_path if fit_on_eval: train_cmd += ' --fit_on_eval' if fit_on_eval_steps is not None: train_cmd += ' --fit_on_eval_steps ' + str(int(fit_on_eval_steps)) procs[chief_or_master] = run_cmd( train_cmd, '%s/log_%s.txt' % (test_dir, chief_or_master)) tf_config['task'] = {'type': 'ps', 'index': 0} os.environ['TF_CONFIG'] = json.dumps(tf_config) set_gpu_id('') procs['ps'] = run_cmd(train_cmd, '%s/log_%s.txt' % (test_dir, 'ps')) for idx in range(num_worker - 1): tf_config['task'] = {'type': 'worker', 'index': idx} os.environ['TF_CONFIG'] = json.dumps(tf_config) set_gpu_id(gpus[idx + 1]) worker_name = 'worker_%d' % idx procs[worker_name] = run_cmd(train_cmd, '%s/log_%s.txt' % (test_dir, worker_name)) if num_evaluator > 0: tf_config['task'] = {'type': 'evaluator', 'index': 0} os.environ['TF_CONFIG'] = json.dumps(tf_config) set_gpu_id('') procs['evaluator'] = run_cmd(train_cmd, '%s/log_%s.txt' % (test_dir, 'evaluator')) return procs def _ps_worker_distribute_eval(pipeline_config_path, checkpoint_path, test_dir, num_worker, num_evaluator=0): gpus = get_available_gpus() # not enough gpus, run on cpu only if len(gpus) < num_worker: gpus = [None] * num_worker ports = _get_ports(num_worker + 1) chief_or_master = 'master' if num_evaluator == 0 else 'chief' cluster = { chief_or_master: ['localhost:%d' % ports[0]], 'worker': ['localhost:%d' % ports[i] for i in range(1, num_worker)], 'ps': ['localhost:%d' % ports[-1]] } tf_config = {'cluster': cluster} procs = {} tf_config['task'] = {'type': chief_or_master, 'index': 0} os.environ['TF_CONFIG'] = json.dumps(tf_config) os.environ[constant.SORT_COL_BY_NAME] = '1' set_gpu_id(gpus[0]) train_cmd = 'python -m easy_rec.python.eval --pipeline_config_path {} --checkpoint_path {} \ --distribute_eval True --eval_result_path distribute_eval_result.txt'.format( pipeline_config_path, checkpoint_path) procs[chief_or_master] = run_cmd( train_cmd, '%s/distribute_eval_log_%s.txt' % (test_dir, chief_or_master)) tf_config['task'] = {'type': 'ps', 'index': 0} os.environ['TF_CONFIG'] = json.dumps(tf_config) set_gpu_id('') procs['ps'] = run_cmd(train_cmd, '%s/distribute_eval_log_%s.txt' % (test_dir, 'ps')) for idx in range(num_worker - 1): tf_config['task'] = {'type': 'worker', 'index': idx} os.environ['TF_CONFIG'] = json.dumps(tf_config) set_gpu_id(gpus[idx + 1]) worker_name = 'worker_%d' % idx procs[worker_name] = run_cmd( train_cmd, '%s/distribute_eval_log_%s.txt' % (test_dir, worker_name)) if num_evaluator > 0: tf_config['task'] = {'type': 'evaluator', 'index': 0} os.environ['TF_CONFIG'] = json.dumps(tf_config) set_gpu_id('') procs['evaluator'] = run_cmd( train_cmd, '%s/distribute_eval_log_%s.txt' % (test_dir, 'evaluator')) return procs def _multi_worker_mirror_train(pipeline_config_path, test_dir, num_worker): gpus = get_available_gpus() # not enough gpus, run on cpu only if len(gpus) < num_worker: gpus = [None] * num_worker ports = _get_ports(num_worker) tf_config = { 'cluster': { 'worker': ['localhost:%d' % ports[i] for i in range(num_worker)] } } procs = {} train_cmd = 'python -m easy_rec.python.train_eval --pipeline_config_path %s' % pipeline_config_path for idx in range(num_worker): tf_config['task'] = {'type': 'worker', 'index': idx} os.environ['TF_CONFIG'] = json.dumps(tf_config) set_gpu_id(gpus[idx]) worker_name = 'worker_%d' % idx procs[worker_name] = run_cmd(train_cmd, '%s/log_%s.txt' % (test_dir, worker_name)) return procs def _multi_worker_hvd_train(pipeline_config_path, test_dir, num_worker): gpus = get_available_gpus() # not enough gpus, run on cpu only if len(gpus) < num_worker: gpus = '' else: gpus = ','.join(gpus) set_gpu_id(gpus) ports = _get_ports(num_worker) hosts = ','.join(['localhost:%d' % ports[i] for i in range(num_worker)]) train_cmd = 'horovodrun -np %d --hosts %s python -m easy_rec.python.train_eval --pipeline_config_path %s' % ( num_worker, hosts, pipeline_config_path) proc = run_cmd(train_cmd, '%s/log_hvd.txt' % test_dir) proc_wait(proc, timeout=1200) return proc.returncode == 0
[docs]def test_distributed_train_eval(pipeline_config_path, test_dir, total_steps=50, num_evaluator=0, edit_config_json=None, use_hvd=False, fit_on_eval=False, num_epoch=0): logging.info('testing pipeline config %s' % pipeline_config_path) pipeline_config = _load_config_for_test(pipeline_config_path, test_dir, total_steps, num_epoch) if edit_config_json is not None: config_util.edit_config(pipeline_config, edit_config_json) if use_hvd: pipeline_config.train_config.sync_replicas = False if pipeline_config.train_config.train_distribute not in [ DistributionStrategy.EmbeddingParallelStrategy, DistributionStrategy.SokStrategy ]: pipeline_config.train_config.train_distribute =\ DistributionStrategy.HorovodStrategy train_config = pipeline_config.train_config config_util.save_pipeline_config(pipeline_config, test_dir) test_pipeline_config_path = os.path.join(test_dir, 'pipeline.config') task_failed = None procs = None try: if use_hvd: return _multi_worker_hvd_train(test_pipeline_config_path, test_dir, 2) if train_config.train_distribute == DistributionStrategy.NoStrategy: num_worker = 2 procs = _ps_worker_train( test_pipeline_config_path, test_dir, num_worker, num_evaluator, fit_on_eval, fit_on_eval_steps=int(total_steps // 2)) elif train_config.train_distribute == DistributionStrategy.MultiWorkerMirroredStrategy: num_worker = 2 procs = _multi_worker_mirror_train(test_pipeline_config_path, test_dir, num_worker) else: raise NotImplementedError # print proc info assert len(procs) > 0, 'processes are empty' for k, proc in procs.items(): logging.info('%s pid: %d' % (k, proc.pid)) task_finish_cnt = 0 task_has_finished = {k: False for k in procs.keys()} while True: for k, proc in procs.items(): if proc.poll() is None: if task_failed is not None: logging.error('task %s failed, %s quit' % (task_failed, k)) proc.terminate() if k != 'ps': task_has_finished[k] = True task_finish_cnt += 1 logging.info('task_finish_cnt %d' % task_finish_cnt) else: if not task_has_finished[k]: # process quit by itself if k != 'ps': task_finish_cnt += 1 task_has_finished[k] = True logging.info('task_finish_cnt %d' % task_finish_cnt) if proc.returncode != 0: logging.error('%s failed' % k) task_failed = k else: logging.info('%s run successfuly' % k) if task_finish_cnt >= num_worker: break time.sleep(1) except Exception as e: logging.error('Exception: ' + str(e)) raise e finally: if procs is not None: for k, proc in procs.items(): if proc.poll() is None: logging.info('terminate %s' % k) proc.terminate() if task_failed is not None: logging.error('train %s failed' % pipeline_config_path) return task_failed is None
[docs]def test_distribute_eval_test(cur_eval_path, test_dir): single_work_eval_path = os.path.join(cur_eval_path, 'eval_result.txt') distribute_eval_path = os.path.join(test_dir, 'distribute_eval_result.txt') if not os.path.exists(distribute_eval_path): return False single_data = read_data_from_json_path(single_work_eval_path) distribute_data = read_data_from_json_path(distribute_eval_path) single_ret = { k: single_data[k] for k in single_data.keys() if 'loss' not in k and 'step' not in k } distribute_ret = { k: distribute_data[k] for k in distribute_data.keys() if 'loss' not in k } difference_num = 0.00001 for k in single_ret.keys(): if (abs(single_ret[k] - distribute_ret[k]) > difference_num): logging.error( 'distribute_eval difference[%.8f] large than threshold[%.8f]' % (abs(single_ret[k] - distribute_ret[k]), difference_num)) return False return True
[docs]def test_distributed_eval(pipeline_config_path, checkpoint_path, test_dir, total_steps=50, num_evaluator=0): logging.info('testing pipeline config %s' % pipeline_config_path) pipeline_config = _load_config_for_distribute_eval(pipeline_config_path, test_dir) train_config = pipeline_config.train_config config_util.save_pipeline_config(pipeline_config, test_dir) test_pipeline_config_path = os.path.join(test_dir, 'pipeline.config') task_failed = None procs = None is_equal = False try: if train_config.train_distribute == DistributionStrategy.NoStrategy: num_worker = 2 procs = _ps_worker_distribute_eval(test_pipeline_config_path, checkpoint_path, test_dir, num_worker, num_evaluator) else: raise NotImplementedError # print proc info assert len(procs) > 0, 'processes are empty' for k, proc in procs.items(): logging.info('%s pid: %d' % (k, proc.pid)) task_finish_cnt = 0 task_has_finished = {k: False for k in procs.keys()} while True: for k, proc in procs.items(): if proc.poll() is None: if task_failed is not None: logging.error('task %s failed, %s quit' % (task_failed, k)) proc.terminate() if k != 'ps': task_has_finished[k] = True task_finish_cnt += 1 logging.info('task_finish_cnt %d' % task_finish_cnt) else: if not task_has_finished[k]: # process quit by itself if k != 'ps': task_finish_cnt += 1 task_has_finished[k] = True logging.info('task_finish_cnt %d' % task_finish_cnt) if proc.returncode != 0: logging.error('%s failed' % k) task_failed = k else: logging.info('%s run successfuly' % k) if task_finish_cnt >= num_worker: break time.sleep(1) is_equal = test_distribute_eval_test(checkpoint_path, test_dir) except Exception as e: logging.error('Exception: ' + str(e)) raise e finally: if procs is not None: for k, proc in procs.items(): if proc.poll() is None: logging.info('terminate %s' % k) proc.terminate() if task_failed is not None: logging.error('eval %s failed[%s]' % (pipeline_config_path, task_failed)) eval_success = (task_failed is None) and is_equal return eval_success