In [1]:
import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
import sklearn
import pandas as pd
import os
import sys
import time
import tensorflow as tf

from tensorflow import keras

print(tf.__version__)
print(sys.version_info)
for module in mpl, np, pd, sklearn, tf, keras:
    print(module.__name__, module.__version__)

2.2.0
sys.version_info(major=3, minor=6, micro=9, releaselevel='final', serial=0)
matplotlib 3.3.4
numpy 1.19.5
pandas 1.1.5
sklearn 0.24.2
tensorflow 2.2.0
tensorflow.keras 2.3.0-tf


In [2]:
!ls generate_csv

test_00.csv  test_08.csv   train_06.csv  train_14.csv  valid_02.csv
test_01.csv  test_09.csv   train_07.csv  train_15.csv  valid_03.csv
test_02.csv  train_00.csv  train_08.csv  train_16.csv  valid_04.csv
test_03.csv  train_01.csv  train_09.csv  train_17.csv  valid_05.csv
test_04.csv  train_02.csv  train_10.csv  train_18.csv  valid_06.csv
test_05.csv  train_03.csv  train_11.csv  train_19.csv  valid_07.csv
test_06.csv  train_04.csv  train_12.csv  valid_00.csv  valid_08.csv
test_07.csv  train_05.csv  train_13.csv  valid_01.csv  valid_09.csv


In [2]:
source_dir = "./generate_csv/"

#通过判断开头去添加文件
def get_filenames_by_prefix(source_dir, prefix_name):
    all_files = os.listdir(source_dir)
    results = []
    for filename in all_files:
        if filename.startswith(prefix_name):
            results.append(os.path.join(source_dir, filename))
    return results

train_filenames = get_filenames_by_prefix(source_dir, "train")
valid_filenames = get_filenames_by_prefix(source_dir, "valid")
test_filenames = get_filenames_by_prefix(source_dir, "test")

import pprint
pprint.pprint(train_filenames)
pprint.pprint(valid_filenames)
pprint.pprint(test_filenames)


['./generate_csv/train_08.csv',
 './generate_csv/train_11.csv',
 './generate_csv/train_18.csv',
 './generate_csv/train_15.csv',
 './generate_csv/train_17.csv',
 './generate_csv/train_00.csv',
 './generate_csv/train_01.csv',
 './generate_csv/train_19.csv',
 './generate_csv/train_14.csv',
 './generate_csv/train_02.csv',
 './generate_csv/train_16.csv',
 './generate_csv/train_09.csv',
 './generate_csv/train_03.csv',
 './generate_csv/train_12.csv',
 './generate_csv/train_10.csv',
 './generate_csv/train_13.csv',
 './generate_csv/train_05.csv',
 './generate_csv/train_07.csv',
 './generate_csv/train_04.csv',
 './generate_csv/train_06.csv']
['./generate_csv/valid_01.csv',
 './generate_csv/valid_05.csv',
 './generate_csv/valid_02.csv',
 './generate_csv/valid_04.csv',
 './generate_csv/valid_08.csv',
 './generate_csv/valid_07.csv',
 './generate_csv/valid_06.csv',
 './generate_csv/valid_00.csv',
 './generate_csv/valid_09.csv',
 './generate_csv/valid_03.csv']
['./generate_csv/test_00.csv',
 './gener

In [3]:
#下面的接口都是之前用过的
def parse_csv_line(line, n_fields = 9):
    defs = [tf.constant(np.nan)] * n_fields
    parsed_fields = tf.io.decode_csv(line, record_defaults=defs)
    x = tf.stack(parsed_fields[0:-1])
    y = tf.stack(parsed_fields[-1:])
    return x, y

def csv_reader_dataset(filenames, n_readers=5,
                       batch_size=32, n_parse_threads=5,
                       shuffle_buffer_size=10000):
    dataset = tf.data.Dataset.list_files(filenames)
    dataset = dataset.repeat()
    dataset = dataset.interleave(
        lambda filename: tf.data.TextLineDataset(filename).skip(1),
        cycle_length = n_readers
    )
    dataset.shuffle(shuffle_buffer_size)
    dataset = dataset.map(parse_csv_line,
                          num_parallel_calls=n_parse_threads)
    dataset = dataset.batch(batch_size)
    return dataset

batch_size = 32
train_set = csv_reader_dataset(train_filenames,
                               batch_size = batch_size)
valid_set = csv_reader_dataset(valid_filenames,
                               batch_size = batch_size)
test_set = csv_reader_dataset(test_filenames,
                              batch_size = batch_size)


In [4]:
!ls

chapter_4.tar.gz	      tf02_data_generate_csv.ipynb
generate_csv		      tf03-tfrecord_basic_api.ipynb
generate_tfrecords	      tf04_data_generate_tfrecord.ipynb
temp.csv		      tfrecord_basic
tf01-dataset_basic_api.ipynb


# 把train_set,valid_set,test_set 存储到tfrecord类型的文件中

In [5]:
#把基础的如何序列化的步骤搞到一个函数
def serialize_example(x, y):
    """Converts x, y to tf.train.Example and serialize"""
    input_feautres = tf.train.FloatList(value = x) #特征
    label = tf.train.FloatList(value = y)#标签
    features = tf.train.Features(
        feature = {
            "input_features": tf.train.Feature(
                float_list = input_feautres),
            "label": tf.train.Feature(float_list = label)
        }
    )
    #把features变为example
    example = tf.train.Example(features = features)
    return example.SerializeToString()  #把example序列化
#n_shards是存为多少个文件，steps_per_shard和 steps_per_epoch类似
def csv_dataset_to_tfrecords(base_filename, dataset,
                             n_shards, steps_per_shard,
                             compression_type = None):
    #压缩文件类型
    options = tf.io.TFRecordOptions(
        compression_type = compression_type)
    all_filenames = []
    
    for shard_id in range(n_shards):
        filename_fullpath = '{}_{:05d}-of-{:05d}'.format(
            base_filename, shard_id, n_shards) #base_filename是一个前缀
        #打开文件
        with tf.io.TFRecordWriter(filename_fullpath, options) as writer:
            #取出数据,为什么skip，上一个文件写了前500行，下一个文件存后面的数据
            for x_batch, y_batch in dataset.skip(shard_id * steps_per_shard).take(steps_per_shard):
                for x_example, y_example in zip(x_batch, y_batch):
                    writer.write(
                        serialize_example(x_example, y_example))
        all_filenames.append(filename_fullpath)
    #返回所有tfrecord文件名
    return all_filenames

In [6]:
!rm -rf generate_tfrecords

In [7]:
for i in train_set.take(1):
    print(i)  

(<tf.Tensor: shape=(32, 8), dtype=float32, numpy=
array([[ 8.01544309e-01,  2.72161424e-01, -1.16243929e-01,
        -2.02311516e-01, -5.43051600e-01, -2.10396163e-02,
        -5.89762092e-01, -8.24184567e-02],
       [ 4.85305160e-01, -8.49241912e-01, -6.53012618e-02,
        -2.33796556e-02,  1.49743509e+00, -7.79065788e-02,
        -9.02363241e-01,  7.81451464e-01],
       [-6.67222738e-01, -4.82395217e-02,  3.45294058e-01,
         5.38266838e-01,  1.85218394e+00, -6.11253828e-02,
        -8.41709316e-01,  1.52048469e+00],
       [ 6.30343556e-01,  1.87416613e+00, -6.71321452e-02,
        -1.25433668e-01, -1.97375536e-01, -2.27226317e-02,
        -6.92407250e-01,  7.26523340e-01],
       [ 6.36364639e-01, -1.08954263e+00,  9.26090255e-02,
        -2.05381244e-01,  1.20256710e+00, -3.63012254e-02,
        -6.78410172e-01,  1.82235345e-01],
       [-2.98072815e-01,  3.52261662e-01, -1.09205075e-01,
        -2.50555217e-01, -3.40640247e-02, -6.03400404e-03,
         1.08055484e+00, -1

In [8]:
%%time
# 训练集和测试集都分20
n_shards = 20
train_steps_per_shard = 11610 // batch_size // n_shards
valid_steps_per_shard = 3880 // batch_size // 10
test_steps_per_shard = 5170 // batch_size // 10

output_dir = "generate_tfrecords"
if not os.path.exists(output_dir):
    os.mkdir(output_dir)

train_basename = os.path.join(output_dir, "train")
valid_basename = os.path.join(output_dir, "valid")
test_basename = os.path.join(output_dir, "test")

train_tfrecord_filenames = csv_dataset_to_tfrecords(
    train_basename, train_set, n_shards, train_steps_per_shard, None)
valid_tfrecord_filenames = csv_dataset_to_tfrecords(
    valid_basename, valid_set, 10, valid_steps_per_shard, None)
test_tfrecord_fielnames = csv_dataset_to_tfrecords(
    test_basename, test_set, 10, test_steps_per_shard, None)
#执行会发现目录下总计生成了60个文件,这里文件数目改为一致，为了对比时间

CPU times: user 40 s, sys: 8.95 s, total: 48.9 s
Wall time: 42.6 s


In [9]:
!ls -l generate_tfrecords

总用量 1960
-rw-rw-r-- 1 luke luke 47616 Jul 23 11:33 test_00000-of-00010
-rw-rw-r-- 1 luke luke 47616 Jul 23 11:33 test_00001-of-00010
-rw-rw-r-- 1 luke luke 47616 Jul 23 11:33 test_00002-of-00010
-rw-rw-r-- 1 luke luke 47616 Jul 23 11:33 test_00003-of-00010
-rw-rw-r-- 1 luke luke 47616 Jul 23 11:33 test_00004-of-00010
-rw-rw-r-- 1 luke luke 47616 Jul 23 11:33 test_00005-of-00010
-rw-rw-r-- 1 luke luke 47616 Jul 23 11:33 test_00006-of-00010
-rw-rw-r-- 1 luke luke 47616 Jul 23 11:33 test_00007-of-00010
-rw-rw-r-- 1 luke luke 47616 Jul 23 11:33 test_00008-of-00010
-rw-rw-r-- 1 luke luke 47616 Jul 23 11:33 test_00009-of-00010
-rw-rw-r-- 1 luke luke 53568 Jul 23 11:32 train_00000-of-00020
-rw-rw-r-- 1 luke luke 53568 Jul 23 11:32 train_00001-of-00020
-rw-rw-r-- 1 luke luke 53568 Jul 23 11:32 train_00002-of-00020
-rw-rw-r-- 1 luke luke 53568 Jul 23 11:32 train_00003-of-00020
-rw-rw-r-- 1 luke luke 53568 Jul 23 11:32 train_00004-of-00020
-rw-rw-r-- 1 luke luke 53568 Jul 23 11:3

In [33]:
#生成一下压缩的
# n_shards = 20
# train_steps_per_shard = 11610 // batch_size // n_shards
# valid_steps_per_shard = 3880 // batch_size // n_shards
# test_steps_per_shard = 5170 // batch_size // n_shards

# output_dir = "generate_tfrecords_zip"
# if not os.path.exists(output_dir):
#     os.mkdir(output_dir)

# train_basename = os.path.join(output_dir, "train")
# valid_basename = os.path.join(output_dir, "valid")
# test_basename = os.path.join(output_dir, "test")
# #只需修改参数的类型即可
# train_tfrecord_filenames = csv_dataset_to_tfrecords(
#     train_basename, train_set, n_shards, train_steps_per_shard,
#     compression_type = "GZIP")
# valid_tfrecord_filenames = csv_dataset_to_tfrecords(
#     valid_basename, valid_set, n_shards, valid_steps_per_shard,
#     compression_type = "GZIP")
# test_tfrecord_fielnames = csv_dataset_to_tfrecords(
#     test_basename, test_set, n_shards, test_steps_per_shard,
#     compression_type = "GZIP")

In [34]:
!ls -l generate_tfrecords_zip

总用量 860
-rw-rw-r-- 1 luke luke 10171 May  7 11:16 test_00000-of-00020
-rw-rw-r-- 1 luke luke 10230 May  7 11:16 test_00001-of-00020
-rw-rw-r-- 1 luke luke 10204 May  7 11:16 test_00002-of-00020
-rw-rw-r-- 1 luke luke 10213 May  7 11:16 test_00003-of-00020
-rw-rw-r-- 1 luke luke 10229 May  7 11:16 test_00004-of-00020
-rw-rw-r-- 1 luke luke 10200 May  7 11:16 test_00005-of-00020
-rw-rw-r-- 1 luke luke 10199 May  7 11:16 test_00006-of-00020
-rw-rw-r-- 1 luke luke 10215 May  7 11:16 test_00007-of-00020
-rw-rw-r-- 1 luke luke 10179 May  7 11:16 test_00008-of-00020
-rw-rw-r-- 1 luke luke 10149 May  7 11:16 test_00009-of-00020
-rw-rw-r-- 1 luke luke 10141 May  7 11:16 test_00010-of-00020
-rw-rw-r-- 1 luke luke 10221 May  7 11:16 test_00011-of-00020
-rw-rw-r-- 1 luke luke 10209 May  7 11:16 test_00012-of-00020
-rw-rw-r-- 1 luke luke 10214 May  7 11:16 test_00013-of-00020
-rw-rw-r-- 1 luke luke 10212 May  7 11:16 test_00014-of-00020
-rw-rw-r-- 1 luke luke 10209 May  7 11:16 test

In [10]:
#打印一下文件名
pprint.pprint(train_tfrecord_filenames)
pprint.pprint(valid_tfrecord_filenames)
pprint.pprint(test_tfrecord_fielnames)

['generate_tfrecords/train_00000-of-00020',
 'generate_tfrecords/train_00001-of-00020',
 'generate_tfrecords/train_00002-of-00020',
 'generate_tfrecords/train_00003-of-00020',
 'generate_tfrecords/train_00004-of-00020',
 'generate_tfrecords/train_00005-of-00020',
 'generate_tfrecords/train_00006-of-00020',
 'generate_tfrecords/train_00007-of-00020',
 'generate_tfrecords/train_00008-of-00020',
 'generate_tfrecords/train_00009-of-00020',
 'generate_tfrecords/train_00010-of-00020',
 'generate_tfrecords/train_00011-of-00020',
 'generate_tfrecords/train_00012-of-00020',
 'generate_tfrecords/train_00013-of-00020',
 'generate_tfrecords/train_00014-of-00020',
 'generate_tfrecords/train_00015-of-00020',
 'generate_tfrecords/train_00016-of-00020',
 'generate_tfrecords/train_00017-of-00020',
 'generate_tfrecords/train_00018-of-00020',
 'generate_tfrecords/train_00019-of-00020']
['generate_tfrecords/valid_00000-of-00010',
 'generate_tfrecords/valid_00001-of-00010',
 'generate_tfrecords/valid_00002

In [11]:
%%time
#把数据读取出来
expected_features = {
    "input_features": tf.io.FixedLenFeature([8], dtype=tf.float32),
    "label": tf.io.FixedLenFeature([1], dtype=tf.float32)
}

def parse_example(serialized_example):
    example = tf.io.parse_single_example(serialized_example,
                                         expected_features)
    return example["input_features"], example["label"]

def tfrecords_reader_dataset(filenames, n_readers=5,
                             batch_size=32, n_parse_threads=5,
                             shuffle_buffer_size=10000):
    dataset = tf.data.Dataset.list_files(filenames)
    dataset = dataset.repeat() #为了能够无限次epoch
    dataset = dataset.interleave(
#         lambda filename: tf.data.TFRecordDataset(
#             filename, compression_type = "GZIP"),
          lambda filename: tf.data.TFRecordDataset(
            filename),
          cycle_length = n_readers
    )
    #洗牌，就是给数据打乱,样本顺序打乱
    dataset.shuffle(shuffle_buffer_size)
    dataset = dataset.map(parse_example,
                          num_parallel_calls=n_parse_threads)#把对应的一个样本是字节流的，变为浮点类型
    dataset = dataset.batch(batch_size)  #原来写进去是一条一条的sample，要分配
    return dataset

#测试一下，tfrecords_reader_dataset是否可以正常运行
# tfrecords_train = tfrecords_reader_dataset(train_tfrecord_filenames,
#                                            batch_size = 3)
# for x_batch, y_batch in tfrecords_train.take(10):
#     print(x_batch)
#     print(y_batch)

CPU times: user 58 µs, sys: 14 µs, total: 72 µs
Wall time: 80.1 µs


In [12]:
%%time
#得到dataset,dataset是tensor，可以直接拿tensor训练

batch_size = 32
tfrecords_train_set = tfrecords_reader_dataset(
    train_tfrecord_filenames, batch_size = batch_size)
tfrecords_valid_set = tfrecords_reader_dataset(
    valid_tfrecord_filenames, batch_size = batch_size)
tfrecords_test_set = tfrecords_reader_dataset(
    test_tfrecord_fielnames, batch_size = batch_size)

Please report this to the TensorFlow team. When filing the bug, set the verbosity to 10 (on Linux, `export AUTOGRAPH_VERBOSITY=10`) and attach the full output.
Cause: Unable to locate the source code of <function tfrecords_reader_dataset.<locals>.<lambda> at 0x7f98284712f0>. Note that functions defined in certain environments, like the interactive Python shell do not expose their source code. If that is the case, you should to define them in a .py source file. If you are certain the code is graph-compatible, wrap the call using @tf.autograph.do_not_convert. Original error: could not get source code
Please report this to the TensorFlow team. When filing the bug, set the verbosity to 10 (on Linux, `export AUTOGRAPH_VERBOSITY=10`) and attach the full output.
Cause: Unable to locate the source code of <function tfrecords_reader_dataset.<locals>.<lambda> at 0x7f98284712f0>. Note that functions defined in certain environments, like the interactive Python shell do not expose their source code

In [17]:
type(tfrecords_train_set)

tensorflow.python.data.ops.dataset_ops.BatchDataset

In [13]:
for i in tfrecords_train_set.take(1):
    print(i)

(<tf.Tensor: shape=(32, 8), dtype=float32, numpy=
array([[-1.33849168e+00,  1.15326405e+00, -5.64006925e-01,
        -1.73587687e-02, -1.50110144e-02,  1.30692095e-01,
        -7.62392581e-01,  6.61608279e-01],
       [ 1.69143653e+00, -4.48740691e-01,  1.74070787e+00,
         2.62504518e-01,  3.60605478e-01,  3.49154733e-02,
        -9.30357397e-01,  8.06418836e-01],
       [-3.68034393e-01, -1.00944233e+00,  9.95716763e+00,
         8.32341957e+00, -1.11282730e+00, -1.44638717e-01,
         1.34183347e+00, -2.12248623e-01],
       [ 6.85438991e-01, -5.28840959e-01,  2.43798941e-01,
        -1.07175767e-01, -3.28932047e-01, -7.14625940e-02,
        -5.66433609e-01, -9.73988622e-02],
       [ 2.22390127e+00, -1.00944233e+00,  1.12938309e+00,
         6.29329979e-01, -4.16031510e-01,  3.57014686e-01,
        -7.34398425e-01,  3.57006729e-01],
       [-3.75547409e-01,  7.52762854e-01, -5.35495043e-01,
        -6.88404664e-02,  8.90460610e-01,  8.53771530e-03,
         9.73243952e-01, -1

In [14]:
#开始训练
model = keras.models.Sequential([
    keras.layers.Dense(30, activation='relu',
                       input_shape=[8]),
    keras.layers.Dense(1),
])
model.compile(loss="mean_squared_error", optimizer="sgd")
callbacks = [keras.callbacks.EarlyStopping(
    patience=5, min_delta=1e-2)]

history = model.fit(tfrecords_train_set,
                    validation_data = tfrecords_valid_set,
                    steps_per_epoch = 11160 // batch_size,
                    validation_steps = 3870 // batch_size,
                    epochs = 100,
                    callbacks = callbacks)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100


In [20]:
model.evaluate(tfrecords_test_set, steps = 5160 // batch_size)



0.33755674958229065