样本

离线样本

  • 离线样本可以使用SQL在MaxCompute或者Hive/Spark平台上构造.

  • 可以使用 推荐算法定制 来自动生成离线特征 和 离线样本的流程.

样本权重

  • 指定输入一列为sample_weight

    • data_config.sample_weight

  • 示例:

      data_config {
        input_fields {
          input_name: 'clk'
          input_type: DOUBLE
        }
        input_fields {
          input_name: 'field1'
          input_type: STRING
        }
        ...
        input_fields {
          input_name: 'sw'
          input_type: DOUBLE
        }
    
        sample_weight: 'sw'
    
        label_fields: 'clk'
        batch_size: 1024
        input_type: CSVInput
      }
    

实时样本

前置条件

  • 服务开通:

    • 除了MaxCompute, OSS, Dataworks, Hologres, 需要额外开通Flink, Datahub(或者Kafka)

    • 产品具体开通手册,参考PAI-REC最佳实践里面的开通手册

数据准备

  • 用户行为实时流

    • 通过datahub接入

    • 包含字段:

      • event_type, 事件类型: exposure / click / buy / like / play /…

      • event_time, 时间发生时间

      • duration, 持续时间,可选

      • request_id, 请求id

      • user_id, 用户id

      • item_id, 商品id

      • 其他信息,可选

  • 特征埋点(callback)

    • 需要部署EAS callback服务, 服务配置和EAS打分服务一致

      • 单独部署EAS callback服务的原因是避免影响EAS打分服务的性能

      • EAS callback对rt的要求低于EAS打分服务

    • 通过PAI-REC推荐引擎写入Datahub

      • PAI-REC配置

      • 特征保存在Datahub topic: odl_callback_log, schema:

      request_idrequest_timemoduleuser_iditem_idscenecontext_featuresgenerate_featuresraw_featuresrequest_infoitem_featuresuser_featurescallback_log
      stringbigintstringstringstringstringstringstringstringstringstringstringstring
      请求id请求时间算法id用户id商品id场景context特征FG生成的特征原始特征请求信息商品特征用户特征callback日志
      • request_id, user_id, item_id, request_time, generate_features 是后续需要的字段

    • Custom推荐引擎:

      • 调用EAS服务获取全埋点特征, 调用方式参考文档

        • 请求的item list为下发列表,不是排序阶段的列表

        • EasyrecRequest.setDebugLevel(3), 生成EasyRec训练所需要的特征

      • 通过PBResponse.getGenerateFeaturesMap获取生成的特征

      • 特征写入Datahub topic: odl_callback_log

样本生成

  1. 样本预处理

    • 提取曝光、点击等事件信息:

        create temporary table user_click_log ...
      
        create temporary table user_expose_log ...
      
        create temporary table odl_sample_preprocess (
            request_id string,
            `user_id` string,
            item_id string,
            event_type string,
            event_time bigint,
            scene string
        ) with (
          'connector' = 'datahub',
          'endPoint' = 'http://dh-cn-beijing-int-vpc.aliyuncs.com/',
          'project' = 'easy_rec_proj',
          'topic' = 'odl_sample_preprocess',
          'subId' = '166987972XXXXXXX',
          'accessId' = 'LTAIXXXXXXXX',
          'accessKey' = 'XYZXXXXXXXX'
        );
      
        begin STATEMENT SET;
        insert into odl_sample_preprocess
        select aa as request_id, bb as user_id, cc as item_id, dd as event_type,
            ee as event_time, ff as scene
        from user_click_log;
      
        insert into odl_sample_preprocess
        select aa as request_id, bb as user_id, cc as item_id, 'expose' as event_type,
            ee as event_time, ff as scene
        from user_expose_log;
      
      • project: datahub project

      • topic: datahub topic

      • subId: datahub订阅id, 每个flink sql任务需要单独创建订阅id, 否则会有冲突

      • scene: extra fields, 可选字段

  2. 样本Events聚合(OnlineSampleAggr):

    • 上传资源包: rec-realtime-0.8-SNAPSHOT.jar image.png

    • 入口参数: com.aliyun.rec.feature.job.OnlineSampleEventsAggr image.png

    • flink配置:

        datahub.endpoint: 'http://dh-cn-beijing-int-vpc.aliyuncs.com/'
        datahub.accessId: xxx
        datahub.accessKey: xxx
        datahub.inputTopic: user_behavior_log
        datahub.sinkTopic: odl_sample_aggr
        datahub.projectName: odl_sample_preprocess
        datahub.startInSecs: '1655571600'
      
        input.userid: user_id
        input.itemid: item_id
        input.request-id: request_id
        input.event-type: event
        input.event-duration: play_time
        input.event-ts: ts
        input.expose-event: expose
        input.event-extra: 'scene'
        input.wait-positive-secs: '900'
        state.max-event-num: '100'
      
      • datahub参数配置

        • accessId: 鉴权id

        • accessKey: 鉴权secret

        • projectName: 项目名称

        • endpoint: 使用带vpc的endpoint

        • inputTopic: 读取的datahub topic

        • sinkTopic: 写入的datahub topic

        • startInSecs: 开始读取的位点,单位是seconds

      • input: datahub schema配置

        • userid: userid字段名

        • itemid: itemid字段名

        • request-id: request_id字段名

        • event-duration: event持续时间

        • event-type: event类型字段

        • event-ts: event发生时间字段(seconds)

        • expose-event: 曝光事件类型

          • 曝光事件延迟不再下发

          • 其它事件延迟会补充下发

        • event-extra: 其它event相关字段,多个字段以”,”分割

        • wait-positive-secs: 等待正样本的时间, 单位是seconds

      • state:

        • max-event-num: 只存最新的n个event, 默认n是Integer.MAX_VALUE

      • debug-mode: 默认是false, 为true时, 打印详细的event信息和timer注册以及触发的信息

      • datahub topic schema:

        • inputTopic: user_behavior_log

          request_id user_id item_id play_time event ts scene ...
          string string string double string bigint string ...
        • sinkTopic: odl_sample_aggr

          request_id user_id item_id events
          string string string string
        • events数据格式:

          [
           {"duration":6493,"eventTime":1659667790,"eventType":"play","properties":{"scene":"main"}},
           {"duration":6259,"eventTime":1659667796,"eventType":"play","properties":{"scene":"main"}}
          ]
          
  3. label生成, 目前提供三种python udf:

    • playtime: sum_over(events, ‘playtime’)

    • click: has_event(events, ‘click’)

    • min_over / max_over: min_over(events, ‘eventTime’)

    • 可以使用python自定义任意udf, 参考文档

    • udf 上传(vvp-console): image.png

    • 示例:

        insert into odl_sample_with_lbl
        select request_id, user_id, item_id,
            ln(if(playtime < 0, 0, playtime) + 1) as ln_play_time,
            cast((playtime > 10 or is_like > 0) as bigint) as is_valid_play,
            is_like, ts
        from (
            select *, sum_over(events, 'duration', TRUE) / 1000.0 as playtime,
                has_event(events, 'likes') as is_like,
                min_over(events, 'eventTime') as ts
            from odl_sample_aggr
        );
      
  4. 样本join全埋点特征

      create temporary table odl_sample_with_lbl(
        `request_id`    STRING,
        `user_id`       STRING,
        `item_id`       STRING,
        `ln_play_time`  DOUBLE,
        `is_valid_play` BIGINT,
        `min_ts`        BIGINT,
        `max_ts`        BIGINT,
        `ts`            AS TO_TIMESTAMP(
              FROM_UNIXTIME(if (min_ts is not null and min_ts < UNIX_TIMESTAMP(),
               min_ts, UNIX_TIMESTAMP()), 'yyyy-MM-dd HH:mm:ss')),
        WATERMARK FOR `ts` AS `ts` - INTERVAL '5' SECOND
      ) WITH (
        'connector' = 'datahub',
        'endPoint' = 'http://dh-cn-beijing-int-vpc.aliyuncs.com/',
        'project' = 'easy_rec_proj',
        'topic' = 'odl_sample_with_lbl',
        'subId' = '165519436817538OG0',
        'accessId' = 'LTAIxxx',
        'accessKey' = 'xxxxxxxxx',
        'startTime' = '2022-07-02 14:30:00'
      );
    
      create temporary table odl_callback_log(
        `request_id`        STRING,
        `request_time`      BIGINT,
        `module`    STRING,
        `user_id`   STRING,
        `item_id`   STRING,
        `scene`     STRING,
        `generate_features` STRING,
        `ts`              AS
            TO_TIMESTAMP(FROM_UNIXTIME(if(request_time is not null and request_time < UNIX_TIMESTAMP(),
                  request_time, UNIX_TIMESTAMP()), 'yyyy-MM-dd HH:mm:ss')),
        WATERMARK FOR `ts` AS `ts` - INTERVAL '5' SECOND
      ) WITH (
        'connector' = 'datahub',
        'endPoint' = 'http://dh-cn-beijing-int-vpc.aliyuncs.com/',
        'project' = 'easy_rec_proj',
        'topic' = 'odl_callback_log',
        'subId' = '16567769418786B4JH',
        'accessId' = 'LTAIxxx',
        'accessKey' = 'xxxxxx'
        'startTime' = '2022-07-02 14:30:00'
      );
    
    
      create temporary view sample_view as
      select a.request_id, a.user_id, a.item_id, a.ln_play_time, a.is_valid_play, feature, b.request_time
      from  odl_sample_with_lbl a
      inner join (
           select * from (
            select request_id, item_id, request_time, generate_features as feature, ts,
              row_number() over(partition by request_id, item_id order by proctime() asc) as rn
            from odl_callback_log
            where `module` = 'item' and (generate_features is not null and generate_features <> '')
           ) where rn = 1
      ) b
      on a.request_id = b.request_id and a.item_id = b.item_id
      where a.ts between b.ts - INTERVAL '30' SECONDS  and b.ts + INTERVAL '30' MINUTE;
    
    • create temporary table注意事项:

      • ts作为watermark需要限制小于当前时间, 防止因为异常的timestamp导致watermark混乱

      • temporary table可以只列举需要的字段,不必枚举所有字段

      • datahub connector更多参数请参考文档

      • kafka connector参考文档

    • odl_callback_log需要做去重, 防止因为重复调用造成样本重复

    • flink配置开启ttl(millisecond), 控制state大小:

        table.exec.state.ttl: '2400000'
      
      • ttl(miliseconds)的设置考虑两个因素:

        • odl_sample_with_lbl相对请求时间request_time的延迟

          • ttl < 相对延迟, 就会有样本丢失

          • 统计相对延迟:

            • 将odl_sample_with_lbl / odl_callback_log落到MaxCompute

            • 按request_id join 计算ts的差异

        • ttl越大state越大, 保存checkpoint时间越长, 性能下降

    • 存储引擎开启gemini kv分离(generate_features字段值很大):

        state.backend.gemini.kv.separate.mode: GLOBAL_ENABLE
        state.backend.gemini.kv.separate.value.size.threshold: '500'
      
  5. 实时样本写入Datahub / Kafka

      create temporary table odl_sample_with_fea_and_lbl(
        `request_id`    string,
        `user_id`        string,
        `item_id`          string,
        `ln_play_time`  double,
        `is_valid_play` bigint,
        `feature`       string,
        `request_time`  bigint
      ) WITH (
        'connector' = 'datahub',
        'endPoint' = 'http://dh-cn-beijing-int-vpc.aliyuncs.com/',
        'project' = 'odl_sample',
        'topic' = 'odl_sample_with_fea_and_lbl',
        'subId' = '1656xxxxxx',
        'accessId' = 'LTAIxxxxxxx',
        'accessKey' = 'Q82Mxxxxxxxx'
      );
      insert into odl_sample_with_fea_and_lbl
      select * from sample_view;
    
    • subId: datahub subscription id

数据诊断

  • 实时样本需要关注下面的信息和离线是否一致:

    • 样本总量: 因为样本延迟和全埋点特征join不上,导致样本量下降,需要增加interval join的区间和state ttl

    • 正负样本比例: 因为正样本延迟到达导致的延迟下发导致在线正样本占比偏低, 增加wait-positive-secs

    • 特征一致性: EAS callback服务和EAS打分引擎配置是否一样.

  • 校验方法:

    • 实时样本落到maxcompute / hive, 和离线的数据作对比

    • EasyRec训练的summary里面查看label的正负样本比 image.png

实时训练