当前位置:首页|资讯|人工智能|深度学习|机器学习

tensorflow 六种方法构建读入batch样本(含序列特征处理),踩坑经验值得收藏

作者:算法全栈之路发布时间:2023-02-19

tensorflow 六种方法构建读入batch样本(含序列特征处理),踩坑经验值得收藏

书接上文,对 图机器学习算法 感兴趣 的 同学 可以去 图算法十篇 之 图机器学习系列文章总结  这里查看,对 推荐广告算法 感兴趣 的同学 可以去这里 系列小作文之企业级机器学习pipline总结 查看,干货多多 哦! 而对 使用 tensorflow 实现 复杂机器学习/深度学习 模型 感兴趣 的同学, 欢迎关注 算法全栈之路 的 公众号 接下来 逐步更新 的 模型手把手系列 的文章~

本篇是 模型手把手系列 的第二篇文章,本系列的上一篇文章  模型手把手系列开篇 之 python、spark 和 java 生成TFrecord 中 我们 主要说明 了 如何用 多种方式 生成 tensorflow 官方推荐 的 数据格式 tfrecord 的 方法,而 本章 我们则 将继续 看看 tensorflow 如何 读取 各种类型的特征,特别是序列特征数据 ,并使用 多种方法生成 batch 训练样本,代码 涵盖 tensorflow 1.x 系列tensorflow 2.x 系列 的 方法,走过 路过 不能错过 哦 !

作者 花了 大量时间 来 整理 本文章里 介绍的 多种方法 的源码,是因为 在 当初写 图算法相关的文章之 图上 deepwalk 算法理论与实战,图算法之瑞士军刀篇(一) 以及 图上 deepwalk 算法理论 与 tensorflow keras 实战,图算法之瑞士军刀篇(二) 这 两篇 文章的时候, 小小的 batch 数据生成,坑死了我这个混迹于国内外互联网大厂多年的算法老同志~ ,有些问题 没遇到不算事,遇到了 找bub 真是要了我的小命了。 闲言少叙 ,就看文章干不干,转需吧 ~

本文主要讲解了 6 种 用tensorflow 1.x / 2.x 如何读取 训练数据,特别是 序列特征数据 的 处理方法。因为 这些方法 有着 各自的应用场景 和 各自使用特点,算是 对 上次遇坑 的报复性解决心理 吧,这里 我 全部 把开发列举 出来了,希望 可以 切实的帮助到 同样 遇到问题的老哥 。

老话说得好: 代码是表达程序员思想的最好语言。本文的 数据读入代码 ,刻意 剖析了 使用 tensorflow 多种方法读入 用户历史行为序列特征 的过程 ,代码 每个 单元cell 均可以 独立完美 运行成功,具有极高的 参考价值 哦。详细内容 直接 看代码 吧!~~

(1)代码时光

本文共介绍了 6种 tensorflow 读取数据  并 batch 训练 的方法,包括使用 slice_input_producer、from_tensor_slices、generate、interleave 以及 自定义 生成batch 数据 等方法,下面就让我们 一种一种方法 的 介绍吧,总有一种适合你的。

(1.0) 数据准备

本文 用到的 数据,从 内存中 读取csv的,我们在这里直接列出; 而使用到 tfrecord 的,我们则使用的上文 模型手把手系列开篇 之 python、spark 和 java 生成TFrecord  中python方法单机版生成的tfrecord 数据。

@ 欢迎关注作者公众号 算法全栈之路

import pandas as pd 

raw_df = pd.DataFrame([[28,12.1,'male',[1,2],1], [30,8.7, 'female',[3,4,5],0], [32,24.6,'female',[6,7,8,9,10],1]], columns=['age''price','sex','click_list','label'])

# 序列特征长度不够填充,使用 tf.train.batch 生成 batch 必须要定长序列 
max_len=5 
padding_value=0 
raw_df['click_list'] = raw_df['click_list'].apply(lambda x: x + [padding_value]*(max_len- len(x)))
raw_df['click_list_str'] = raw_df['click_list'].apply(lambda x: '#'.join(map(str, x)))

# 普通特征处理
raw_df['age'] = raw_df['age'].astype(str)
raw_df['sex'] = raw_df['sex'].astype(str)
raw_df['label'] = raw_df['label'].astype(str)

print(raw_df)

raw_df.to_csv("read_sample.csv",sep='\t',index=False)

代码 很 简单,我就 不赘述 了。

中间要 注意的是:click_list 这一列特征就是 序列特征 ,每个用户 的 历史行为序列 的长度 并非定长 ,但是在某些方法里, 生成batch特征的时候,要求list 类型的数据是定长的 ,所以 我这里 用 默认值 0 进行了 padding 填充 。

(1.1)tensorflow 1.x 使用 slice_input_producer 生成 batch 数据

看代码吧。

@ 欢迎关注作者公众号 算法全栈之路

import tensorflow.compat.v1 as tf
tf.compat.v1.disable_eager_execution()

# 创建输入数据队列
input_queue = tf.train.slice_input_producer(
    [raw_df['age'].to_list(), raw_df['price'].to_list(),raw_df['sex'].to_list(), raw_df['click_list'].to_list(),raw_df['label'].to_list()],
    shuffle=True
)

# 读取队列中的数据
all_sample_count = len(raw_df)
batch_size = 2
num_threads = 1
min_after_dequeue = 1

all_feature_batch = tf.train.batch(
    input_queue,
    batch_size=batch_size,
    num_threads=num_threads,
    capacity=min_after_dequeue + (num_threads + 1) * batch_size
)

# 打印输出结果
with tf.Session() as sess:
    # 初始化变量
    sess.run(tf.global_variables_initializer())
    # 启动队列操作
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)

    for i in range(all_sample_count//batch_size):
        age_batch, price_batch, sex_batch,click_list_batch,label_batch = sess.run(all_feature_batch)
        print(f"age_batch: {age_batch}\n price_batch: {price_batch}\n sex_batch: {sex_batch} n click_list_batch: {click_list_batch} n label_batch: {label_batch} ")

    coord.request_stop()
    coord.join(threads)

这里这里的 tf 是 import tensorflow.compat.v1 as tf ,适配于 tensorflow 1.x 系列 的模型。

这里主要用了 tf.train.slice_input_producer 和 tf.train.batch 数据来 生成batch 数据。

还是 重点说下序列特征 列 click_list_batch ,这里 读入的 是一个 历史点击行为序列ID  list,是 定长的 int 型 。定长 那就好办了呀 ,直接 接embeding matrix 拿到每个 id 对应 的 embeding 然后扔进模型里去。

这个 cell 里 的 代码是可以 跑通的,如果确实帮助到你了,欢迎 关注作者的公众号 凑个份子~

(1.2)tensorflow 2.0 直接使用 from_tensor_slices 生成 batch 数据

@ 欢迎关注作者公众号 算法全栈之路

import tensorflow as tf
tf.config.run_functions_eagerly(True)
print("eager_status:",tf.executing_eagerly())

import pandas as pd

batch_size = 3
max_len=5

raw_df['click_list'] = raw_df['click_list'].apply(lambda x: '#'.join(map(str, x)))
raw_df['age'] = raw_df['age'].astype(str)
raw_df['price'] = raw_df['price'].astype(str)

dataset = tf.data.Dataset.from_tensor_slices((raw_df[['age''price''sex''click_list']].values, raw_df['label'].values))

dataset = dataset.shuffle(buffer_size=len(raw_df)).batch(batch_size)

# Iterate over the batches
for batch in dataset:
    features, labels = batch
    # 定位到 序列特征所在位置 
    str_list_batch = features[:,3:4]
    list_feature=tf.strings.split(str_list_batch,"#")
    # 输出是一个SparseTensorValue对象
    # https://blog.csdn.net/ustbbsy/article/details/116644136
    print("ccccc:",list_feature.values)
    print(list_feature.shape)
    
    print('Features:', features)
    # 另一种定位序列特征的方式 
    print('Features(1):', features[1][3])
    print('Labels:', labels)
    print()

注意: 因为 我本机mac 的 tensorflow 版本是 2.6.0 的版本,所以这里tf默认就是2.6.0了。

我们 可以使用

tf.config.run_functions_eagerly(True)
print("eager_status:",tf.executing_eagerly()) 

来确认 是否启动了 tensorflow 2.x系列的 eager 模式

这里还是 重点说一些 序列特征吧,这里读入的是 把序列特征拼接成一个字符串 ,然后在对 每个batch里 进行 字符串的分割,我们 这里用到的 方法 是 :

 str_list_batch = features[:,3:4]
 list_feature=tf.strings.split(str_list_batch,"#")

注意 tf.strings.split 的 返回是一个 SparseTensorValue 对象, .values 属性可以拿到具体的值。

因为是 把序列特征拼接成了字符串,所以 我们这里 不要求序列长度是定长 的,非定长的序列特征处理 得到 SparseTensorValue 之后,我们可以使用 tf.Variable 或 tf.keras.layers.Embedding 来创建该嵌入矩阵。最后,我们可以使用 tf.nn.embedding_lookup_sparse()函数 来获取 嵌入向量。

最后在强调一点 就是:对于支持 eager模式的 dataset, 我们可以直接用for循环以及dict 来获取对应特征的取值 哦,非常方便,非常强大 ,使用前 注意确认 eager模式 是否开启

(1.3)使用 dataset 的 generate 生成 batch 数据

对于 数据量不太大 的训练数据,很多同学 还是 习惯 使用 python 的 yeild 来 构建generator , 所以 我们 也提供 了 基于 generator 来 生成 batch 样本 的方法,看代码吧 ~

@ 欢迎关注作者公众号 算法全栈之路

import tensorflow as tf
import pandas as pd
import numpy as np

# 创建一个虚拟的 pandas dataframe
df = pd.DataFrame({
    'float_col': np.random.rand(3),
    'int_col': np.random.randint(0, 10, size=(3)),
    'str_col': ['string{}'.format(i) for i in range(3)],
    'list_col': [[i, i+1] for i in range(3)]
})

print(df)


# 创建一个生成器函数,用于将 pandas dataframe 转换为 Tensorflow 数据集
def generator():
    for index, row in df.iterrows():
        yield (
            {
                'float_input': row['float_col'],
                'int_input': row['int_col'],
                'str_input': row['str_col'],
                'list_input': row['list_col']
            },
            row['int_col']  # 将 int_col 作为标签
        )

# 创建 Tensorflow 数据集
dataset = tf.data.Dataset.from_generator(generator, 
                                         output_signature=(
                                             {
                                                 'float_input': tf.TensorSpec(shape=(), dtype=tf.float32),
                                                 'int_input': tf.TensorSpec(shape=(), dtype=tf.int32),
                                                 'str_input': tf.TensorSpec(shape=(), dtype=tf.string),
                                                 'list_input': tf.TensorSpec(shape=(2,), dtype=tf.int32)
                                             },
                                             tf.TensorSpec(shape=(), dtype=tf.int32)
                                         ))

# 对数据进行批次处理
batch_size = 8
dataset = dataset.batch(batch_size)

# 打印数据集中的第一个批次
for feature_batch, label_batch in dataset:
    print('float_input:', feature_batch['float_input'])
    print('int_input:', feature_batch['int_input'])
    print('str_input:', feature_batch['str_input'])
    print('list_input:', feature_batch['list_input'])
    print('label:', label_batch)

这里的 重点 依然是 序列特征的处理 ,对于 定长 以及 非定长 的 序列特征,本文前面均 进行了 说明,这里 我就不在 强调 了,往上翻去 找找 就可以 看到哦。

(1.4)使用dataset 的 interleave 接口去读取 txt 样本文本文件

接下来 要介绍 的 两种方法,才是 我们在 工业上 大数据场景下 实际使用的 非常多的 特征数据 读入方法,看代码吧~

@ 欢迎关注作者公众号 算法全栈之路

import tensorflow as tf
print("eager_status:",tf.executing_eagerly())
tf.config.run_functions_eagerly(True)

# 训练集所有的列
TRAIN_SET_ALL_COLUMNS=["age""price""sex""click_list""label""click_list_str"]
# 没有用到的列,这里把去掉 
TRAIN_SET_USELESS_COLUMN_NAMES=['click_list']
# 并行度
NUM_PARALLEL_FOR_DATASET=1
BATCH_SIZE=2

def parse_txt_line(line, label_dtype):
    if label_dtype == tf.dtypes.float32:
        label_default_value = 0.0
    else:
        label_default_value = 0

    # int64类型的默认值,用long(0)也不好使,要设置一个真正大于int32的数值
    # 默认值个数必须和读入个数一致,很重要 ValueError: not enough values to unpack (expected 12, got 4)
    # 整数默认是 [1 << 32]
    # 默认值很重要,格式不对会导致这个问题
    # ValueError: Column dtype and SparseTensors dtype must be compatible. key: adid, column dtype:
    # <dtype: 'string'>, tensor dtype: <dtype: 'int64'>
    field_defaults = [ [""], [""], [""], [""],[label_default_value],[""]]
    # 从csv格式中解析出这些字段
    age, price, sex, click_list, label, click_list_str = tf.io.decode_csv(line, field_defaults, field_delim="\t")

    # 对一些字段使用 tf.cast 进行类型转换,这里完全不需要,下游有进行hash
    # adid = tf.cast(adid, tf.dtypes.int32)
    # | 号分隔,  tf.strings.to_number 把字符串转化为默认浮点数
    # user_click_seq = tf.strings.to_number(tf.strings.split(user_click_seq, sep="|"))
    label = tf.cast(label, tf.int64)

    fields_values = [age, price, sex, click_list, label, click_list_str]
    features = dict(zip(TRAIN_SET_ALL_COLUMNS, fields_values))

    # 没有用到de列,需要pop出去
    for useless_column_name in TRAIN_SET_USELESS_COLUMN_NAMES:
        features.pop(useless_column_name)
    label = features.pop("label")
    # 返回一个dict{feature_name,value} 和 label
    return features, label


def get_text_dataset(data_set_path_list, label_dtype):
    filenames_dataset = tf.data.Dataset.from_tensor_slices(data_set_path_list)
    raw_dataset = filenames_dataset.interleave(
        # 2个线程并行去读  TextLineDataset
        lambda x: tf.data.TextLineDataset(x, num_parallel_reads=NUM_PARALLEL_FOR_DATASET),
        # NUM_PARALLEL_FOR_DATASET=2
        cycle_length=NUM_PARALLEL_FOR_DATASET,
        block_length=BATCH_SIZE,
        num_parallel_calls=NUM_PARALLEL_FOR_DATASET
    )
    raw_dataset = raw_dataset. \
        map(lambda line: parse_txt_line(line, label_dtype), num_parallel_calls=NUM_PARALLEL_FOR_DATASET). \
        apply(tf.data.experimental.ignore_errors())

    # 格式 dict(fea_name,value) , label
    return raw_dataset


train_set_path_list=["read_sample.csv"]
train_raw_dataset = get_text_dataset(train_set_path_list, label_dtype=tf.dtypes.int64)

for feature_batch, label_batch in train_raw_dataset:
        print(feature_batch['age'])
        print(label_batch)

这里的 代码 是 工业大数据场景下 常用的方法,我们使用 tf.data.Dataset.from_tensor_slices 接口,一般会 先使用 tf.io.gfile 相关的接口 读取到 hdfs 大数据集群上的 文件路径 ,然后 tf.data.TextLineDataset并行读取,这里 的 方法 主要调用了  parse_txt_line 这个方法 来解析单行 的 样本文件。

这里的 序列特征,我们可以在 parse_txt_line 用 python方法 把处理成 list 数据,但是 要求定长,具体方法 看本文 开始的时候 的处理方法。当然,也可以在 获得 batch 得时候 用  tf.strings.split 进行处理,和上面开篇 第二种 方法一样。

更近一步,甚至 我们可以 将 序列特征字符串 一直 放到 模型里 去处理 都是可以的。

(1.5) 使用dataset 的 interleave 接口去读取 tfrecord 文件

这个方法是 企业级机器学习pipline 处理大数据量下 模型训练 用到最多 的方法,甚至 tfrecord 能够 兼容语音图像 等格式,这一块 感兴趣 的 同学自己下去 查看资料 吧,我们 这里 主要介绍的 都是 数值以及字符串列表搜广推算法 更多用到 的 特征数据。

@ 欢迎关注作者公众号 算法全栈之路

import tensorflow as tf
print("eager_status:",tf.executing_eagerly())
tf.config.run_functions_eagerly(True)

# 并行度
NUM_PARALLEL_FOR_DATASET=1
BATCH_SIZE=2

def get_tf_record_dataset(data_set_path_list,shuffle=True):
    files = tf.data.Dataset.list_files(data_set_path_list, shuffle=shuffle)
    dataset = files.apply(
        tf.data.experimental.parallel_interleave(
            lambda x: tf.data.TFRecordDataset(x, num_parallel_reads=NUM_PARALLEL_FOR_DATASET),
            cycle_length=NUM_PARALLEL_FOR_DATASET,
            block_length=BATCH_SIZE,
            sloppy=False
        )
    )
    
    # parsing_spec 是一个字典, 它提供了每个特征到 "FixedLenFeature" 或 "VarLenFeature" 的映射
    parsing_spec = {
        'age': tf.io.FixedLenFeature([1], tf.int64),
        'price': tf.io.FixedLenFeature([1],tf.float32),
        'gender': tf.io.FixedLenFeature([1], tf.string),
        'click_list': tf.io.VarLenFeature(tf.int64),
        'label': tf.io.FixedLenFeature([1],tf.int64)
    }
    
    def read_batch(serialized):
        
        feature = tf.io.parse_example(serialized, features=parsing_spec)
        label = feature['label']
        return feature, {"label": label}

    raw_tfrecord_data = dataset.map(read_batch, NUM_PARALLEL_FOR_DATASET)
    # 格式 dict(fea_name,value), label
    return raw_tfrecord_data


train_set_path_list=["py_tf_record"]
train_raw_dataset = get_tf_record_dataset(train_set_path_list)


for feature_batch, label_batch in train_raw_dataset:
        print("age:",feature_batch['age'])
        # 这里的 click_list 返回的是一个 SparseTensor, 用 .values 方法可以得到值。
        print("click_list:",feature_batch['click_list'].values)
        print('label:',label_batch)

特别推荐 这里介绍 的 处理数据 的方法,将训练数据 保存为 tfrecord 格式,不仅 速度快 而且 节省存储 空间,对 生成 tfrecord 数据 不熟悉的 同学,可以 去看 作者的 上一篇 文章 模型手把手系列开篇 之 python、spark 和 java 生成TFrecord 。

这里 重点要强调 的是  parsing_specread_batch 方法,parsing_spec 中 定义来 定长和变长 tfrecord 数据 的 解析方法,非常优秀,读出来得 序列特征 是 变长的 SparseTensor, 后面 处理得到 embeding 的方法,可与参考 上面文章 介绍的 SparseTensor 得到 embeding 得 部分内容 哦,这里我 就也 不再赘述 了。

本文 到这里 ,我们 共介绍了 5 种 tensorflow 读取数据 的方法,后两种 为 工业大数据模型 训练场景下 的 算法利器,强烈推荐。

加上 图上 deepwalk 算法理论与实战,图算法之瑞士军刀篇(一)   文章里使用的 自定义生成 batch 数据 的方法,共有  6种方法 来 适配不同的 业务数据 读取场景了,可以 算是 集 tensorflow 读取数据 的 大成之作了,每一个 小节 的 代码 均可以 独立运行成功,非常 值得收藏!

到这里, 模型手把手系列开篇 之  tensorflow 六种方法读入batch样本(含序列特征处理), 踩坑经验值得收藏  的 全文 就 写完 了。本文代码 每个模块 均可以 独立跑 成功,总有一款 适合你,希望 可以对你 有 参考作用 ~

码字不易,觉得有收获就动动小手转载一下吧,你的支持是我写下去的最大动力 ~

更多更全更新内容,欢迎关注作者的公众号: 算法全栈之路

- END -



Copyright © 2024 aigcdaily.cn  北京智识时代科技有限公司  版权所有  京ICP备2023006237号-1