Source code for easy_rec.python.utils.test_utils

# -*- encoding:utf-8 -*-
# Copyright (c) Alibaba, Inc. and its affiliates.
"""Contains functions which are convenient for unit testing.

isort:skip_file
"""
from future import standard_library
standard_library.install_aliases()
import yaml
import glob
import json
import logging
import os
import random
import shutil
import string
import subprocess
import time
from multiprocessing import Process
from subprocess import getstatusoutput
from tensorflow.python.platform import gfile
import numpy as np
from easy_rec.python.protos.train_pb2 import DistributionStrategy
from easy_rec.python.utils import config_util
from easy_rec.python.protos.pipeline_pb2 import EasyRecConfig

TEST_DIR = './tmp/easy_rec_test'


[docs]def get_hdfs_tmp_dir(test_dir): """Create a randomly of directory in HDFS.""" tmp_name = ''.join( [random.choice(string.ascii_letters + string.digits) for i in range(8)]) assert isinstance(test_dir, str) test_rand_dir = os.path.join(test_dir, tmp_name) gfile.MkDir(test_rand_dir) return test_rand_dir
[docs]def get_tmp_dir(): tmp_name = ''.join( [random.choice(string.ascii_letters + string.digits) for i in range(8)]) if os.environ.get('TEST_DIR', '') != '': global TEST_DIR TEST_DIR = os.environ['TEST_DIR'] dir_name = os.path.join(TEST_DIR, tmp_name) if os.path.exists(dir_name): shutil.rmtree(dir_name) os.makedirs(dir_name) return dir_name
[docs]def clear_all_tmp_dirs(): shutil.rmtree(TEST_DIR)
[docs]def set_gpu_id(gpu_id_str): env = os.environ if gpu_id_str is None: env['CUDA_VISIBLE_DEVICES'] = '' else: env['CUDA_VISIBLE_DEVICES'] = gpu_id_str
[docs]def get_available_gpus(): if 'TEST_DEVICES' in os.environ: gpus = os.environ['TEST_DEVICES'].split(',') else: gpus = glob.glob('/dev/nvidia[0-9]*') gpus = [gpu.replace('/dev/nvidia', '') for gpu in gpus] logging.info('available gpus %s' % gpus) return gpus
[docs]def run_cmd(cmd_str, log_file): """Run a shell cmd.""" cmd_str = cmd_str.replace('\r', ' ').replace('\n', ' ') logging.info('RUNCMD: %s > %s 2>&1 ' % (cmd_str, log_file)) with open(log_file, 'w') as lfile: return subprocess.Popen( cmd_str, stdout=lfile, stderr=subprocess.STDOUT, shell=True)
[docs]def RunAsSubprocess(f): """Function dectorator to run function in subprocess. if a function will start a tf session. Because tensorflow gpu memory will not be cleared until the process exit. """ def wrapped_f(*args, **kw): p = Process(target=f, args=args, kwargs=kw) p.start() p.join(timeout=600) assert p.exitcode == 0, 'subprocess run failed: %s' % f.__name__ return wrapped_f
[docs]def clean_up(test_dir): if test_dir is not None: shutil.rmtree(test_dir) # reset to cpu mode set_gpu_id(None)
[docs]def clean_up_hdfs(test_dir): if gfile.Exists(test_dir): gfile.DeleteRecursively(test_dir) set_gpu_id(None)
def _replace_data_for_test(data_path): """Replace real data with test data.""" test_data = {} change = False releated_datasets = [] for k, config in test_data.items(): if k in data_path: releated_datasets.append(k) # if there are multiple keyword detected, use the longest one if len(releated_datasets) > 0: score = [len(k) for k in releated_datasets] best_match = np.argmax(score) data_path = test_data[releated_datasets[best_match]] change = True assert change, 'Failed to replace data with test data' return data_path def _load_config_for_test(pipeline_config_path, test_dir, total_steps=50): pipeline_config = config_util.get_configs_from_pipeline_file( pipeline_config_path) train_config = pipeline_config.train_config eval_config = pipeline_config.eval_config data_config = pipeline_config.data_config train_config.num_steps = total_steps # change model_dir pipeline_config.model_dir = test_dir + '/train' logging.info('test_model_dir %s' % pipeline_config.model_dir) eval_config.num_examples = max(10, data_config.batch_size) data_config.num_epochs = 0 return pipeline_config
[docs]def test_datahub_train_eval(pipeline_config_path, test_dir, process_pipeline_func=None, hyperparam_str='', total_steps=50, post_check_func=None): gpus = get_available_gpus() if len(gpus) > 0: set_gpu_id(gpus[0]) else: set_gpu_id(None) if not isinstance(pipeline_config_path, EasyRecConfig): logging.info('testing pipeline config %s' % pipeline_config_path) if 'TF_CONFIG' in os.environ: del os.environ['TF_CONFIG'] if isinstance(pipeline_config_path, EasyRecConfig): pipeline_config = pipeline_config_path else: pipeline_config = _load_config_for_test(pipeline_config_path, test_dir, total_steps) pipeline_config.train_config.train_distribute = 0 pipeline_config.train_config.num_gpus_per_worker = 1 pipeline_config.train_config.sync_replicas = False if process_pipeline_func is not None: assert callable(process_pipeline_func) pipeline_config = process_pipeline_func(pipeline_config) config_util.save_pipeline_config(pipeline_config, test_dir) test_pipeline_config_path = os.path.join(test_dir, 'pipeline.config') train_cmd = 'python3 -m easy_rec.python.train_eval --pipeline_config_path %s %s' % ( test_pipeline_config_path, hyperparam_str) proc = run_cmd(train_cmd, '%s/log_%s.txt' % (test_dir, 'master')) proc.wait() if proc.returncode != 0: logging.error('train %s failed' % test_pipeline_config_path) return False if post_check_func: return post_check_func(pipeline_config) return True
def _Load_config_for_test_eval(pipeline_config_path): pipeline_config = config_util.get_configs_from_pipeline_file( pipeline_config_path) return pipeline_config
[docs]def test_single_train_eval(pipeline_config_path, test_dir, process_pipeline_func=None, hyperparam_str='', total_steps=50, post_check_func=None): gpus = get_available_gpus() if len(gpus) > 0: set_gpu_id(gpus[0]) else: set_gpu_id(None) if not isinstance(pipeline_config_path, EasyRecConfig): logging.info('testing pipeline config %s' % pipeline_config_path) if 'TF_CONFIG' in os.environ: del os.environ['TF_CONFIG'] if isinstance(pipeline_config_path, EasyRecConfig): pipeline_config = pipeline_config_path else: pipeline_config = _load_config_for_test(pipeline_config_path, test_dir, total_steps) pipeline_config.train_config.train_distribute = 0 pipeline_config.train_config.num_gpus_per_worker = 1 pipeline_config.train_config.sync_replicas = False if process_pipeline_func is not None: assert callable(process_pipeline_func) pipeline_config = process_pipeline_func(pipeline_config) config_util.save_pipeline_config(pipeline_config, test_dir) test_pipeline_config_path = os.path.join(test_dir, 'pipeline.config') train_cmd = 'python -m easy_rec.python.train_eval --pipeline_config_path %s %s' % ( test_pipeline_config_path, hyperparam_str) proc = run_cmd(train_cmd, '%s/log_%s.txt' % (test_dir, 'master')) proc.wait() if proc.returncode != 0: logging.error('train %s failed' % test_pipeline_config_path) return False if post_check_func: return post_check_func(pipeline_config) return True
[docs]def test_feature_selection(pipeline_config): model_dir = pipeline_config.model_dir pipeline_config_path = os.path.join(model_dir, 'pipeline.config') output_dir = os.path.join(model_dir, 'feature_selection') cmd = 'python -m easy_rec.python.tools.feature_selection --config_path %s ' \ '--output_dir %s --topk 5 --visualize true' % (pipeline_config_path, output_dir) proc = run_cmd(cmd, os.path.join(model_dir, 'log_feature_selection.txt')) proc.wait() if proc.returncode != 0: logging.error('feature selection %s failed' % pipeline_config_path) return False return True
[docs]def yaml_replace(train_yaml_path, pipline_config_path, test_pipeline_config_path, test_export_dir=None): with open(train_yaml_path, 'r', encoding='utf-8') as _file: sample = _file.read() x = yaml.load(sample) _command = x['app']['command'] if test_export_dir is not None: _command = _command.replace(pipline_config_path, test_pipeline_config_path).replace( '{EXPOERT_DIR}', test_export_dir) else: _command = _command.replace(pipline_config_path, test_pipeline_config_path) x['app']['command'] = _command with open(train_yaml_path, 'w', encoding='utf-8') as _file: yaml.dump(x, _file)
[docs]def test_hdfs_train_eval(pipeline_config_path, train_yaml_path, test_dir, process_pipeline_func=None, hyperparam_str='', total_steps=2000): gpus = get_available_gpus() if len(gpus) > 0: set_gpu_id(gpus[0]) else: set_gpu_id(None) logging.info('testing pipeline config %s' % pipeline_config_path) logging.info('train_yaml_path %s' % train_yaml_path) if 'TF_CONFIG' in os.environ: del os.environ['TF_CONFIG'] pipeline_config = _load_config_for_test(pipeline_config_path, test_dir, total_steps) logging.info('model_dir in pipeline_config has been modified') pipeline_config.train_config.train_distribute = 0 pipeline_config.train_config.num_gpus_per_worker = 1 pipeline_config.train_config.sync_replicas = False if process_pipeline_func is not None: assert callable(process_pipeline_func) pipeline_config = process_pipeline_func(pipeline_config) config_util.save_pipeline_config(pipeline_config, test_dir) test_pipeline_config_path = os.path.join(test_dir, 'pipeline.config') yaml_replace(train_yaml_path, pipeline_config_path, test_pipeline_config_path) logging.info('test_pipeline_config_path is %s' % test_pipeline_config_path) train_cmd = 'el_submit -yaml %s' % train_yaml_path proc = subprocess.Popen(train_cmd.split(), stderr=subprocess.STDOUT) proc.wait() if proc.returncode != 0: logging.error('train %s failed' % test_pipeline_config_path) logging.error('train_yaml %s failed' % train_yaml_path) return proc.returncode == 0
[docs]def test_hdfs_eval(pipeline_config_path, eval_yaml_path, test_dir, process_pipeline_func=None, hyperparam_str=''): gpus = get_available_gpus() if len(gpus) > 0: set_gpu_id(gpus[0]) else: set_gpu_id(None) logging.info('testing export pipeline config %s' % pipeline_config_path) logging.info('eval_yaml_path %s' % eval_yaml_path) if 'TF_CONFIG' in os.environ: del os.environ['TF_CONFIG'] pipeline_config = _Load_config_for_test_eval(pipeline_config_path) if process_pipeline_func is not None: assert callable(process_pipeline_func) pipeline_config = process_pipeline_func(pipeline_config) config_util.save_pipeline_config(pipeline_config, test_dir) test_pipeline_config_path = os.path.join(test_dir, 'pipeline.config') yaml_replace(eval_yaml_path, pipeline_config_path, test_pipeline_config_path) logging.info('test_pipeline_config_path is %s' % test_pipeline_config_path) eval_cmd = 'el_submit -yaml %s' % eval_yaml_path proc = subprocess.Popen(eval_cmd.split(), stderr=subprocess.STDOUT) proc.wait() if proc.returncode != 0: logging.error('eval %s failed' % test_pipeline_config_path) logging.error('eval_yaml %s failed' % eval_yaml_path) return proc.returncode == 0
[docs]def test_hdfs_export(pipeline_config_path, export_yaml_path, test_dir, process_pipeline_func=None, hyperparam_str=''): gpus = get_available_gpus() if len(gpus) > 0: set_gpu_id(gpus[0]) else: set_gpu_id(None) logging.info('testing export pipeline config %s' % pipeline_config_path) logging.info('export_yaml_path %s' % export_yaml_path) if 'TF_CONFIG' in os.environ: del os.environ['TF_CONFIG'] pipeline_config = _Load_config_for_test_eval(pipeline_config_path) if process_pipeline_func is not None: assert callable(process_pipeline_func) pipeline_config = process_pipeline_func(pipeline_config) config_util.save_pipeline_config(pipeline_config, test_dir) test_pipeline_config_path = os.path.join(test_dir, 'pipeline.config') test_export_path = os.path.join(test_dir, 'export_dir') yaml_replace(export_yaml_path, pipeline_config_path, test_pipeline_config_path, test_export_path) logging.info('test_pipeline_config_path is %s' % test_pipeline_config_path) eval_cmd = 'el_submit -yaml %s' % export_yaml_path proc = subprocess.Popen(eval_cmd.split(), stderr=subprocess.STDOUT) proc.wait() if proc.returncode != 0: logging.error('export %s failed' % test_pipeline_config_path) logging.error('export_yaml %s failed' % export_yaml_path) return proc.returncode == 0
def _ports_in_use(ports): ports_str = '' for i, port in enumerate(ports): if i > 0: ports_str += '|' ports_str += '0.0.0.0:%d|127.0.0.1:%d' % (port, port) stat, output = getstatusoutput('netstat -tlnp | grep -E %s' % ports_str) return stat == 0 def _get_ports(num_worker): port_base = int(os.environ.get('PORT_BASE', 10000)) num_try = 10 for i in range(num_try): ports = np.random.randint(port_base, port_base + 5000, size=num_worker) if not _ports_in_use(ports): return ports logging.info('ports %s in use, retry...' % ports) def _ps_worker_train(pipeline_config_path, test_dir, num_worker, num_evaluator=0): gpus = get_available_gpus() # not enough gpus, run on cpu only if len(gpus) < num_worker: gpus = [None] * num_worker ports = _get_ports(num_worker + 1) chief_or_master = 'master' if num_evaluator == 0 else 'chief' cluster = { chief_or_master: ['localhost:%d' % ports[0]], 'worker': ['localhost:%d' % ports[i] for i in range(1, num_worker)], 'ps': ['localhost:%d' % ports[-1]] } tf_config = {'cluster': cluster} procs = {} tf_config['task'] = {'type': chief_or_master, 'index': 0} os.environ['TF_CONFIG'] = json.dumps(tf_config) set_gpu_id(gpus[0]) train_cmd = 'python -m easy_rec.python.train_eval --pipeline_config_path %s' % pipeline_config_path procs[chief_or_master] = run_cmd( train_cmd, '%s/log_%s.txt' % (test_dir, chief_or_master)) tf_config['task'] = {'type': 'ps', 'index': 0} os.environ['TF_CONFIG'] = json.dumps(tf_config) set_gpu_id('') procs['ps'] = run_cmd(train_cmd, '%s/log_%s.txt' % (test_dir, 'ps')) for idx in range(num_worker - 1): tf_config['task'] = {'type': 'worker', 'index': idx} os.environ['TF_CONFIG'] = json.dumps(tf_config) set_gpu_id(gpus[idx + 1]) worker_name = 'worker_%d' % idx procs[worker_name] = run_cmd(train_cmd, '%s/log_%s.txt' % (test_dir, worker_name)) if num_evaluator > 0: tf_config['task'] = {'type': 'evaluator', 'index': 0} os.environ['TF_CONFIG'] = json.dumps(tf_config) set_gpu_id('') procs['evaluator'] = run_cmd(train_cmd, '%s/log_%s.txt' % (test_dir, 'evaluator')) return procs def _multi_worker_mirror_train(pipeline_config_path, test_dir, num_worker): gpus = get_available_gpus() # not enough gpus, run on cpu only if len(gpus) < num_worker: gpus = [None] * num_worker ports = _get_ports(num_worker) tf_config = { 'cluster': { 'worker': ['localhost:%d' % ports[i] for i in range(num_worker)] } } procs = {} train_cmd = 'python -m easy_rec.python.train_eval --pipeline_config_path %s' % pipeline_config_path for idx in range(num_worker): tf_config['task'] = {'type': 'worker', 'index': idx} os.environ['TF_CONFIG'] = json.dumps(tf_config) set_gpu_id(gpus[idx]) worker_name = 'worker_%d' % idx procs[worker_name] = run_cmd(train_cmd, '%s/log_%s.txt' % (test_dir, worker_name)) return procs
[docs]def test_distributed_train_eval(pipeline_config_path, test_dir, total_steps=50, num_evaluator=0): logging.info('testing pipeline config %s' % pipeline_config_path) pipeline_config = _load_config_for_test(pipeline_config_path, test_dir, total_steps) train_config = pipeline_config.train_config config_util.save_pipeline_config(pipeline_config, test_dir) test_pipeline_config_path = os.path.join(test_dir, 'pipeline.config') task_failed = None procs = None try: if train_config.train_distribute == DistributionStrategy.NoStrategy: num_worker = 2 procs = _ps_worker_train(test_pipeline_config_path, test_dir, num_worker, num_evaluator) elif train_config.train_distribute == DistributionStrategy.MultiWorkerMirroredStrategy: num_worker = 2 procs = _multi_worker_mirror_train(test_pipeline_config_path, test_dir, num_worker) else: raise NotImplementedError # print proc info assert len(procs) > 0, 'processes are empty' for k, proc in procs.items(): logging.info('%s pid: %d' % (k, proc.pid)) task_finish_cnt = 0 task_has_finished = {k: False for k in procs.keys()} while True: for k, proc in procs.items(): if proc.poll() is None: if task_failed is not None: logging.error('task %s failed, %s quit' % (task_failed, k)) proc.terminate() if k != 'ps': task_has_finished[k] = True task_finish_cnt += 1 logging.info('task_finish_cnt %d' % task_finish_cnt) else: if not task_has_finished[k]: # process quit by itself if k != 'ps': task_finish_cnt += 1 task_has_finished[k] = True logging.info('task_finish_cnt %d' % task_finish_cnt) if proc.returncode != 0: logging.error('%s failed' % k) task_failed = k else: logging.info('%s run successfuly' % k) if task_finish_cnt >= num_worker: break time.sleep(1) except Exception as e: logging.error('Exception: ' + str(e)) raise e finally: if procs is not None: for k, proc in procs.items(): if proc.poll() is None: logging.info('terminate %s' % k) proc.terminate() if task_failed is not None: logging.error('train %s failed' % pipeline_config_path) return task_failed is None