完整模拟题1-数据生成代码

  • ~9.21K 字

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
"""
数据生成脚本 - 星点视频数据集
用途:为竞赛训练生成原始脏数据(raw_videos.csv)及清洗后的干净数据(cleaned_videos.csv)
运行方式:python generate_data.py
数据量:默认200,000条,可调整 TOTAL_RECORDS
"""

import pandas as pd
import numpy as np
import random
from datetime import datetime, timedelta

# ========== 配置参数 ==========
TOTAL_RECORDS = 200000
SEED = 2025
OUTPUT_RAW = "raw_videos.csv"
OUTPUT_CLEAN = "cleaned_videos.csv"

# 分类列表
CATEGORIES = [
"tv", "movie", "variety", "anime", "doc",
"game", "music", "knowledge", "tech", "life",
"food", "fashion", "sports", "auto", "animal"
]

# 标签池
TAG_POOL = ["热门", "推荐", "精选", "新作", "搞笑", "剧情", "治愈", "燃向", "经典", "连载"]

# ========== 随机状态初始化 ==========
random.seed(SEED)
np.random.seed(SEED)

# ========== 辅助函数 ==========
def generate_video_id(idx):
"""生成格式为 VID-XXXXXX 的唯一ID"""
return f"VID-{100000 + idx}"

def random_duration(good=True):
"""生成时长字符串,good=False 时可能生成错误格式"""
minutes = random.randint(0, 180)
seconds = random.randint(0, 59)
if good:
return f"{minutes:02d}:{seconds:02d}"
# 故意制造错误
err_type = random.choice(["missing", "bad_format", "bogus"])
if err_type == "missing":
return np.nan if random.random() < 0.5 else ""
elif err_type == "bad_format":
return f"{random.randint(0, 99)}:{random.randint(0, 99)}" # 可能秒数超过59
else:
return "abcd" if random.random() < 0.5 else "12:345"

def random_tags(allow_dup=True):
"""生成逗号分隔的标签,allow_dup=True 可能产生重复标签"""
k = random.randint(1, 4)
chosen = random.choices(TAG_POOL, k=k)
if allow_dup and k > 1 and random.random() < 0.1:
# 人工制造重复
chosen[1] = chosen[0]
return ",".join(chosen)

def random_int_with_anomaly(mean, std, anomaly_prob=0.02, missing_prob=0.02):
"""生成可能有异常或缺失的整数字段"""
if random.random() < missing_prob:
return np.nan
if random.random() < anomaly_prob:
# 异常值:负数或0
return random.choice([0, -random.randint(1, 1000)])
# 正常值:对数正态分布,确保为正
val = np.random.lognormal(mean=np.log(max(mean, 1)), sigma=std)
return int(max(1, val))

# ========== 生成原始数据 ==========
print(f"开始生成 {TOTAL_RECORDS} 条原始数据...")
data = []
for i in range(TOTAL_RECORDS):
if i % 50000 == 0:
print(f" 已生成 {i} 条...")

vid = generate_video_id(i)
cat = random.choice(CATEGORIES)

# 播放量 (均值约 50万,标准差造成变化)
plays = random_int_with_anomaly(500000, 1.2, anomaly_prob=0.015, missing_prob=0.02)
# 评论数 (均值约 5000)
comments = random_int_with_anomaly(5000, 1.5, anomaly_prob=0.015, missing_prob=0.02)
# 点赞数 (均值约 20万)
likes = random_int_with_anomaly(200000, 1.3, anomaly_prob=0.015, missing_prob=0.02)
# 收藏数 (均值约 8000)
favorites = random_int_with_anomaly(8000, 1.4, anomaly_prob=0.015, missing_prob=0.02)

# 时长 (90%概率生成正常格式,10%可能有问题)
duration = random_duration(good=(random.random() > 0.1))

# 标签 (很少部分缺失,部分有重复)
if random.random() < 0.02:
tags = np.nan
else:
tags = random_tags(allow_dup=True)

data.append([vid, cat, plays, comments, likes, favorites, duration, tags])

# 添加一些重复行(约0.5%)
num_dup = int(TOTAL_RECORDS * 0.005)
if num_dup > 0:
for _ in range(num_dup):
idx = random.randint(0, TOTAL_RECORDS-1)
data.append(data[idx][:]) # 追加完全相同的行

# 添加一些极度离群值(播放量极高)
for _ in range(50):
idx = random.randint(0, len(data)-1)
data[idx][2] = random.randint(10000000, 50000000) # 千万级别

df_raw = pd.DataFrame(data, columns=["video_id", "category", "plays", "comments", "likes", "favorites", "duration", "tags"])
print(f"原始数据生成完毕,总计 {len(df_raw)} 行。")

# 保存原始脏数据
df_raw.to_csv(OUTPUT_RAW, index=False)
print(f"已保存原始数据至 {OUTPUT_RAW}")

# ========== 数据清洗(按照题目要求) ==========
print("\n开始清洗数据...")
df = df_raw.copy()

# 1. 将 duration 转换为秒数
def duration_to_seconds(val):
if pd.isna(val) or not isinstance(val, str):
return np.nan
parts = val.strip().split(":")
if len(parts) != 2:
return np.nan
try:
m, s = int(parts[0]), int(parts[1])
if s >= 60: # 秒数不合理
return np.nan
return m * 60 + s
except:
return np.nan

df["duration_seconds"] = df["duration"].apply(duration_to_seconds)

# 2. 删除任一字段缺失的行(包括 duration_seconds 为NaN)
cols_to_check = ["video_id", "category", "plays", "comments", "likes", "favorites", "tags", "duration_seconds"]
df = df.dropna(subset=cols_to_check)

# 3. 将 plays, comments, likes, favorites 转换为数值类型,无法转换的变为 NaN 然后删除
for col in ["plays", "comments", "likes", "favorites"]:
df[col] = pd.to_numeric(df[col], errors='coerce')
df = df.dropna(subset=["plays", "comments", "likes", "favorites"])

# 4. 转换为整数
for col in ["plays", "comments", "likes", "favorites"]:
df[col] = df[col].astype(int)

# 5. 删除负数或零值
for col in ["plays", "comments", "likes", "favorites"]:
df = df[df[col] > 0]

# 6. 处理 tags 重复标签:去重
def dedup_tags(tag_str):
if not isinstance(tag_str, str):
return tag_str
tags_list = [t.strip() for t in tag_str.split(",") if t.strip()]
unique_tags = []
for t in tags_list:
if t not in unique_tags:
unique_tags.append(t)
return ",".join(unique_tags)

df["tags"] = df["tags"].apply(dedup_tags)

# 7. 删除完全重复的行(基于所有列)
df = df.drop_duplicates()

# 8. 删除 plays 大于 99.9分位数的极端离群值
threshold = df["plays"].quantile(0.999)
df = df[df["plays"] <= threshold]

# 9. 按 plays 降序排列,重置索引
df = df.sort_values("plays", ascending=False).reset_index(drop=True)

print(f"清洗完成,剩余 {len(df)} 行。")

# 保存清洗后数据
df.to_csv(OUTPUT_CLEAN, index=False)
print(f"已保存清洗后数据至 {OUTPUT_CLEAN}")

"""
机器学习数据集生成脚本
从清洗后的数据中划分训练集和预测集,并分别生成回归/分类专用文件
"""
from sklearn.model_selection import train_test_split

# 读取清洗后的完整数据
df = pd.read_csv("cleaned_videos.csv")

# 检查必要列是否存在
required = ["video_id", "category", "plays", "comments", "likes", "favorites", "duration_seconds", "tags"]
for col in required:
if col not in df.columns:
raise ValueError(f"缺少列: {col}")

# 划分训练集和预测集(80/20 分层抽样,按 category 保持分布)
train_idx, predict_idx = train_test_split(
df.index, test_size=0.2, random_state=2025, stratify=df["category"]
)
train_df = df.iloc[train_idx].copy()
predict_df = df.iloc[predict_idx].copy()

# ---------- 回归任务 (预测 plays) ----------
# 训练文件:保留所有特征和目标
reg_train = train_df[["video_id", "comments", "likes", "favorites", "duration_seconds", "tags", "plays"]]
reg_train.to_csv("ml_reg_train.csv", index=False)

# 预测文件:去掉目标 plays
reg_predict = predict_df[["video_id", "comments", "likes", "favorites", "duration_seconds", "tags"]]
reg_predict.to_csv("ml_reg_predict.csv", index=False)

# ---------- 分类任务 (预测 category) ----------
# 训练文件:保留特征和目标
clf_train = train_df[["video_id", "comments", "likes", "favorites", "duration_seconds", "tags", "category"]]
clf_train.to_csv("ml_clf_train.csv", index=False)

# 预测文件:去掉目标 category
clf_predict = predict_df[["video_id", "comments", "likes", "favorites", "duration_seconds", "tags"]]
clf_predict.to_csv("ml_clf_predict.csv", index=False)

print("生成完成。")
print(f"回归训练集: {len(reg_train)} 条")
print(f"回归预测集: {len(reg_predict)} 条 (不含 plays)")
print(f"分类训练集: {len(clf_train)} 条")
print(f"分类预测集: {len(clf_predict)} 条 (不含 category)")
print("完成!")
分享