Source code for easy_rec.python.core.sampler

# -*- encoding:utf-8 -*-
# Copyright (c) Alibaba, Inc. and its affiliates.
from __future__ import division
from __future__ import print_function

import json
import logging
import math
import os
import threading

import numpy as np
import tensorflow as tf

from easy_rec.python.protos.dataset_pb2 import DatasetConfig

try:
  import graphlearn as gl
except Exception:
  logging.info(
      'GraphLearn is not installed. You can install it by "pip install https://easyrec.oss-cn-beijing.aliyuncs.com/3rdparty/graphlearn-0.7-cp27-cp27mu-linux_x86_64.whl"'  # noqa: E501
  )

if tf.__version__ >= '2.0':
  tf = tf.compat.v1


def _get_gl_type(field_type):
  type_map = {
      DatasetConfig.INT32: 'int',
      DatasetConfig.INT64: 'int',
      DatasetConfig.STRING: 'string',
      DatasetConfig.BOOL: 'int',
      DatasetConfig.FLOAT: 'float',
      DatasetConfig.DOUBLE: 'float'
  }
  assert field_type in type_map, 'invalid type: %s' % field_type
  return type_map[field_type]


def _get_np_type(field_type):
  type_map = {
      DatasetConfig.INT32: np.int32,
      DatasetConfig.INT64: np.int64,
      DatasetConfig.STRING: np.str,
      DatasetConfig.BOOL: np.bool,
      DatasetConfig.FLOAT: np.float32,
      DatasetConfig.DOUBLE: np.double
  }
  assert field_type in type_map, 'invalid type: %s' % field_type
  return type_map[field_type]


def _get_tf_type(field_type):
  type_map = {
      DatasetConfig.INT32: tf.int32,
      DatasetConfig.INT64: tf.int64,
      DatasetConfig.STRING: tf.string,
      DatasetConfig.BOOL: tf.bool,
      DatasetConfig.FLOAT: tf.float32,
      DatasetConfig.DOUBLE: tf.double
  }
  assert field_type in type_map, 'invalid type: %s' % field_type
  return type_map[field_type]


[docs]class BaseSampler(object): _instance_lock = threading.Lock()
[docs] def __init__(self, fields, num_sample, num_eval_sample=None): self._g = None self._sampler = None # TODO(hongsheng.jhs): check eval mode or not? self._num_sample = num_sample self._num_eval_sample = num_eval_sample if num_eval_sample else num_sample self._build_field_types(fields)
def _init_graph(self): if 'TF_CONFIG' in os.environ: tf_config = json.loads(os.environ['TF_CONFIG']) if 'ps' in tf_config['cluster']: # ps mode tf_config = json.loads(os.environ['TF_CONFIG']) ps_count = len(tf_config['cluster']['ps']) task_count = len(tf_config['cluster']['worker']) + 2 cluster = {'server_count': ps_count, 'client_count': task_count} if tf_config['task']['type'] in ['chief', 'master']: self._g.init(cluster=cluster, job_name='client', task_index=0) elif tf_config['task']['type'] == 'worker': self._g.init( cluster=cluster, job_name='client', task_index=tf_config['task']['index'] + 2) # TODO(hongsheng.jhs): check cluster has evaluator or not? elif tf_config['task']['type'] == 'evaluator': self._g.init( cluster=cluster, job_name='client', task_index=tf_config['task']['index'] + 1) if self._num_eval_sample is not None and self._num_eval_sample > 0: self._num_sample = self._num_eval_sample elif tf_config['task']['type'] == 'ps': self._g.init( cluster=cluster, job_name='server', task_index=tf_config['task']['index']) else: # worker mode task_count = len(tf_config['cluster']['worker']) + 1 if tf_config['task']['type'] in ['chief', 'master']: self._g.init(task_index=0, task_count=task_count) elif tf_config['task']['type'] == 'worker': self._g.init( task_index=tf_config['task']['index'] + 1, task_count=task_count) # TODO(hongsheng.jhs): check cluster has evaluator or not? else: # local mode self._g.init() def _build_field_types(self, fields): self._attr_names = [] self._attr_types = [] self._attr_gl_types = [] self._attr_np_types = [] self._attr_tf_types = [] for i, field in enumerate(fields): self._attr_names.append(field.input_name) self._attr_types.append(field.input_type) self._attr_gl_types.append(_get_gl_type(field.input_type)) self._attr_np_types.append(_get_np_type(field.input_type)) self._attr_tf_types.append(_get_tf_type(field.input_type))
[docs] @classmethod def instance(cls, *args, **kwargs): with cls._instance_lock: if not hasattr(cls, '_instance'): cls._instance = cls(*args, **kwargs) return cls._instance
def __del__(self): self._g.close() def _parse_nodes(self, nodes): features = [] int_idx = 0 float_idx = 0 string_idx = 0 for attr_gl_type, attr_np_type in zip(self._attr_gl_types, self._attr_np_types): if attr_gl_type == 'int': feature = nodes.int_attrs[:, :, int_idx] int_idx += 1 elif attr_gl_type == 'float': feature = nodes.float_attrs[:, :, float_idx] float_idx += 1 elif attr_gl_type == 'string': feature = nodes.string_attrs[:, :, string_idx] string_idx += 1 else: raise ValueError('Unknown attr type %s' % attr_gl_type) feature = np.reshape(feature, [-1])[:self._num_sample].astype(attr_np_type) features.append(feature) return features def _parse_sparse_nodes(self, nodes): features = [] int_idx = 0 float_idx = 0 string_idx = 0 for attr_gl_type, attr_np_type in zip(self._attr_gl_types, self._attr_np_types): if attr_gl_type == 'int': feature = nodes.int_attrs[:, int_idx] int_idx += 1 elif attr_gl_type == 'float': feature = nodes.float_attrs[:, float_idx] float_idx += 1 elif attr_gl_type == 'string': feature = nodes.string_attrs[:, string_idx] string_idx += 1 else: raise ValueError('Unknown attr type %s' % attr_gl_type) feature = feature.astype(attr_np_type) features.append(feature) return features, nodes.indices
[docs]class NegativeSampler(BaseSampler): """Negative Sampler. Weighted random sampling items not in batch. Args: data_path: item feature data path. id:int64 | weight:float | attrs:string. fields: item input fields. num_sample: number of negative samples. batch_size: mini-batch size. attr_delimiter: delimiter of feature string. num_eval_sample: number of negative samples for evaluator. """
[docs] def __init__(self, data_path, fields, num_sample, batch_size, attr_delimiter=':', num_eval_sample=None): super(NegativeSampler, self).__init__(fields, num_sample, num_eval_sample) self._batch_size = batch_size self._g = gl.Graph().node( tf.compat.as_str(data_path), node_type='item', decoder=gl.Decoder( attr_types=self._attr_gl_types, weighted=True, attr_delimiter=attr_delimiter)) self._init_graph() expand_factor = int(math.ceil(self._num_sample / batch_size)) self._sampler = self._g.negative_sampler( 'item', expand_factor, strategy='node_weight')
def _get_impl(self, ids): # assert len(ids) == self._batch_size # tf.logging.info("ids: %s", len(ids)) ids = np.array(ids, dtype=np.int64) nodes = self._sampler.get(ids) features = self._parse_nodes(nodes) return features
[docs] def get(self, ids): """Sampling method. Args: ids: item id tensor. Returns: Negative sampled feature dict. """ sampled_values = tf.py_func(self._get_impl, [ids], self._attr_tf_types) result_dict = {} for k, t, v in zip(self._attr_names, self._attr_tf_types, sampled_values): if t == tf.string: # string convert from np array to tensor will be padded with \000, we need remove it v = tf.regex_replace(v, '\000', '') v.set_shape([self._num_sample]) result_dict[k] = v return result_dict
[docs]class NegativeSamplerV2(BaseSampler): """Negative Sampler V2. Weighted random sampling items which do not have positive edge with the user. Args: user_data_path: user node data path. id:int64 | weight:float. item_data_path: item feature data path. id:int64 | weight:float | attrs:string. edge_data_path: positive edge data path. userid:int64 | itemid:int64 | weight:float fields: item input fields. num_sample: number of negative samples. batch_size: mini-batch size. attr_delimiter: delimiter of feature string. num_eval_sample: number of negative samples for evaluator. """
[docs] def __init__(self, user_data_path, item_data_path, edge_data_path, fields, num_sample, batch_size, attr_delimiter=':', num_eval_sample=None): super(NegativeSamplerV2, self).__init__(fields, num_sample, num_eval_sample) self._batch_size = batch_size self._g = gl.Graph() \ .node(tf.compat.as_str(user_data_path), node_type='user', decoder=gl.Decoder(weighted=True)) \ .node(tf.compat.as_str(item_data_path), node_type='item', decoder=gl.Decoder( attr_types=self._attr_gl_types, weighted=True, attr_delimiter=attr_delimiter)) \ .edge(tf.compat.as_str(edge_data_path), edge_type=('user', 'item', 'edge'), decoder=gl.Decoder(weighted=True)) self._init_graph() expand_factor = int(math.ceil(self._num_sample / batch_size)) self._sampler = self._g.negative_sampler( 'edge', expand_factor, strategy='random', conditional=True)
def _get_impl(self, src_ids, dst_ids): src_ids = np.array(src_ids, dtype=np.int64) dst_ids = np.array(dst_ids, dtype=np.int64) nodes = self._sampler.get(src_ids, dst_ids) features = self._parse_nodes(nodes) return features
[docs] def get(self, src_ids, dst_ids): """Sampling method. Args: src_ids: user id tensor. dst_ids: item id tensor. Returns: Negative sampled feature dict. """ sampled_values = tf.py_func(self._get_impl, [src_ids, dst_ids], self._attr_tf_types) result_dict = {} for k, t, v in zip(self._attr_names, self._attr_tf_types, sampled_values): if t == tf.string: # string convert from np array to tensor will be padded with \000, we need remove it v = tf.regex_replace(v, '\000', '') v.set_shape([self._num_sample]) result_dict[k] = v return result_dict
[docs]class HardNegativeSampler(BaseSampler): """HardNegativeSampler. Weighted random sampling items not in batch as negative samples, and sampling destination nodes in hard_neg_edge as hard negative samples Args: user_data_path: user node data path. id:int64 | weight:float. item_data_path: item feature data path. id:int64 | weight:float | attrs:string. hard_neg_edge_data_path: hard negative edge data path. userid:int64 | itemid:int64 | weight:float fields: item input fields. num_sample: number of negative samples. num_hard_sample: maximum number of hard negative samples. batch_size: mini-batch size. attr_delimiter: delimiter of feature string. num_eval_sample: number of negative samples for evaluator. """
[docs] def __init__(self, user_data_path, item_data_path, hard_neg_edge_data_path, fields, num_sample, num_hard_sample, batch_size, attr_delimiter=':', num_eval_sample=None): super(HardNegativeSampler, self).__init__(fields, num_sample, num_eval_sample) self._batch_size = batch_size self._g = gl.Graph() \ .node(tf.compat.as_str(user_data_path), node_type='user', decoder=gl.Decoder(weighted=True)) \ .node(tf.compat.as_str(item_data_path), node_type='item', decoder=gl.Decoder( attr_types=self._attr_gl_types, weighted=True, attr_delimiter=attr_delimiter)) \ .edge(tf.compat.as_str(hard_neg_edge_data_path), edge_type=('user', 'item', 'hard_neg_edge'), decoder=gl.Decoder(weighted=True)) self._init_graph() expand_factor = int(math.ceil(self._num_sample / batch_size)) self._neg_sampler = self._g.negative_sampler( 'item', expand_factor, strategy='node_weight') self._hard_neg_sampler = self._g.neighbor_sampler(['hard_neg_edge'], num_hard_sample, strategy='full')
def _get_impl(self, src_ids, dst_ids): src_ids = np.array(src_ids, dtype=np.int64) dst_ids = np.array(dst_ids, dtype=np.int64) nodes = self._neg_sampler.get(dst_ids) neg_features = self._parse_nodes(nodes) sparse_nodes = self._hard_neg_sampler.get(src_ids).layer_nodes(1) hard_neg_features, hard_neg_indices = self._parse_sparse_nodes(sparse_nodes) results = [] for i, v in enumerate(hard_neg_features): results.append(np.concatenate([neg_features[i], v], axis=-1)) results.append(hard_neg_indices) return results
[docs] def get(self, src_ids, dst_ids): """Sampling method. Args: src_ids: user id tensor. dst_ids: item id tensor. Returns: Sampled feature dict. The first batch_size is negative samples, remainder is hard negative samples """ output_types = self._attr_tf_types + [tf.int64] output_values = tf.py_func(self._get_impl, [src_ids, dst_ids], output_types) result_dict = {} for k, t, v in zip(self._attr_names, self._attr_tf_types, output_values[:-1]): if t == tf.string: # string convert from np array to tensor will be padded with \000, we need remove it v = tf.regex_replace(v, '\000', '') v.set_shape([None]) result_dict[k] = v hard_neg_indices = output_values[-1] hard_neg_indices.set_shape([None, 2]) result_dict['hard_neg_indices'] = hard_neg_indices return result_dict
[docs]class HardNegativeSamplerV2(BaseSampler): """HardNegativeSampler. Weighted random sampling items which do not have positive edge with the user., and sampling destination nodes in hard_neg_edge as hard negative samples Args: user_data_path: user node data path. id:int64 | weight:float. item_data_path: item feature data path. id:int64 | weight:float | attrs:string. edge_data_path: positive edge data path. userid:int64 | itemid:int64 | weight:float hard_neg_edge_data_path: hard negative edge data path. userid:int64 | itemid:int64 | weight:float fields: item input fields. num_sample: number of negative samples. num_hard_sample: maximum number of hard negative samples. batch_size: mini-batch size. attr_delimiter: delimiter of feature string. num_eval_sample: number of negative samples for evaluator. """
[docs] def __init__(self, user_data_path, item_data_path, edge_data_path, hard_neg_edge_data_path, fields, num_sample, num_hard_sample, batch_size, attr_delimiter=':', num_eval_sample=None): super(HardNegativeSamplerV2, self).__init__(fields, num_sample, num_eval_sample) self._batch_size = batch_size self._g = gl.Graph() \ .node(tf.compat.as_str(user_data_path), node_type='user', decoder=gl.Decoder(weighted=True)) \ .node(tf.compat.as_str(item_data_path), node_type='item', decoder=gl.Decoder( attr_types=self._attr_gl_types, weighted=True, attr_delimiter=attr_delimiter)) \ .edge(tf.compat.as_str(edge_data_path), edge_type=('user', 'item', 'edge'), decoder=gl.Decoder(weighted=True)) \ .edge(tf.compat.as_str(hard_neg_edge_data_path), edge_type=('user', 'item', 'hard_neg_edge'), decoder=gl.Decoder(weighted=True)) self._init_graph() expand_factor = int(math.ceil(self._num_sample / batch_size)) self._neg_sampler = self._g.negative_sampler( 'edge', expand_factor, strategy='random', conditional=True) self._hard_neg_sampler = self._g.neighbor_sampler(['hard_neg_edge'], num_hard_sample, strategy='full')
def _get_impl(self, src_ids, dst_ids): src_ids = np.array(src_ids, dtype=np.int64) dst_ids = np.array(dst_ids, dtype=np.int64) nodes = self._neg_sampler.get(src_ids, dst_ids) neg_features = self._parse_nodes(nodes) sparse_nodes = self._hard_neg_sampler.get(src_ids).layer_nodes(1) hard_neg_features, hard_neg_indices = self._parse_sparse_nodes(sparse_nodes) results = [] for i, v in enumerate(hard_neg_features): results.append(np.concatenate([neg_features[i], v], axis=-1)) results.append(hard_neg_indices) return results
[docs] def get(self, src_ids, dst_ids): """Sampling method. Args: src_ids: user id tensor. dst_ids: item id tensor. Returns: Sampled feature dict. The first batch_size is negative samples, remainder is hard negative samples """ output_types = self._attr_tf_types + [tf.int64] output_values = tf.py_func(self._get_impl, [src_ids, dst_ids], output_types) result_dict = {} for k, t, v in zip(self._attr_names, self._attr_tf_types, output_values[:-1]): if t == tf.string: # string convert from np array to tensor will be padded with \000, we need remove it v = tf.regex_replace(v, '\000', '') v.set_shape([None]) result_dict[k] = v hard_neg_indices = output_values[-1] hard_neg_indices.set_shape([None, 2]) result_dict['hard_neg_indices'] = hard_neg_indices return result_dict
[docs]def build(data_config): if not data_config.HasField('sampler'): return None sampler_type = data_config.WhichOneof('sampler') sampler_config = getattr(data_config, sampler_type) if sampler_type == 'negative_sampler': input_fields = {f.input_name: f for f in data_config.input_fields} attr_fields = [input_fields[name] for name in sampler_config.attr_fields] return NegativeSampler.instance( data_path=sampler_config.input_path, fields=attr_fields, num_sample=sampler_config.num_sample, batch_size=data_config.batch_size, attr_delimiter=sampler_config.attr_delimiter, num_eval_sample=sampler_config.num_eval_sample) elif sampler_type == 'negative_sampler_v2': input_fields = {f.input_name: f for f in data_config.input_fields} attr_fields = [input_fields[name] for name in sampler_config.attr_fields] return NegativeSamplerV2.instance( user_data_path=sampler_config.user_input_path, item_data_path=sampler_config.item_input_path, edge_data_path=sampler_config.pos_edge_input_path, fields=attr_fields, num_sample=sampler_config.num_sample, batch_size=data_config.batch_size, attr_delimiter=sampler_config.attr_delimiter, num_eval_sample=sampler_config.num_eval_sample) elif sampler_type == 'hard_negative_sampler': input_fields = {f.input_name: f for f in data_config.input_fields} attr_fields = [input_fields[name] for name in sampler_config.attr_fields] return HardNegativeSampler.instance( user_data_path=sampler_config.user_input_path, item_data_path=sampler_config.item_input_path, hard_neg_edge_data_path=sampler_config.hard_neg_edge_input_path, fields=attr_fields, num_sample=sampler_config.num_sample, num_hard_sample=sampler_config.num_hard_sample, batch_size=data_config.batch_size, attr_delimiter=sampler_config.attr_delimiter, num_eval_sample=sampler_config.num_eval_sample) elif sampler_type == 'hard_negative_sampler_v2': input_fields = {f.input_name: f for f in data_config.input_fields} attr_fields = [input_fields[name] for name in sampler_config.attr_fields] return HardNegativeSamplerV2.instance( user_data_path=sampler_config.user_input_path, item_data_path=sampler_config.item_input_path, edge_data_path=sampler_config.pos_edge_input_path, hard_neg_edge_data_path=sampler_config.hard_neg_edge_input_path, fields=attr_fields, num_sample=sampler_config.num_sample, num_hard_sample=sampler_config.num_hard_sample, batch_size=data_config.batch_size, attr_delimiter=sampler_config.attr_delimiter, num_eval_sample=sampler_config.num_eval_sample) else: raise ValueError('Unknown sampler %s' % sampler_type)