封面图片 PID:40516130

简要说明

因为咱喜欢收集东西,常会遇到需要处理大量文件且重复性工作较多的情况。所以写了一些小工具,并撰写了相关简要文档以方便使用。

在下文的调用方式说明中,[ ]表示可选参数,[{ }]表示可选参数的可选值,无[ ]表示必选参数

rmsf(remove_same_file)

主要功能描述

移除指定目录下的指定类型的重复文件,通过比较文件的hash值判断文件是否重复。

实现流程描述

  1. 读取指定目录下的所有指定后缀的文件
  2. 默认按文件名的长度从小到大进行排序
  3. 依次计算文件的 hash 值,若文件 hash 值相同,则认为文件重复,将重复文件移除,默认将重复文件移入回收站

使用说明

此工具使用python编写,调用方式为:

1
python rmsf.py [-h] [-d DIR] [-r {0,1,2}] [-v] [-V] [-c {hash,md5,sha1,sha256}] [-e EXCLUDE [EXCLUDE ...]] [-re {y,yes,n,no}] [-t THREAD]

接受的可选参数及其释义如下表所示:

参数 可选值 释义 默认值 备注
-h/--help 显示帮助信息
-d/--dir 目录路径 指定要移除重复文件的目录路径 当前工作目录
-r/--recursive 0, 1, 2 0:不进行递归,仅处理当前目录;1:递归搜索子目录,子目录中执行相同的操作;2:递归搜索子目录,将子目录中的文件与父目录中的文件进行比较 0
-v/--verbose 显示详细操作信息 依次显示已处理的文件数量,剩余的文件数量,总共需要处理的文件数量,被删除的文件的详细信息等内容
-V/--version 显示版本信息
-c/--comparation hash, md5, sha1, sha256 指定比较文件的方式 hash
-e/--exclude 文件后缀 指定后缀的文件 png,jpg 可同时处理多个文件后缀,以逗号分隔,如:-e png,jpg
-re/--recycle y,yes,n,no 是否删除文件到回收站 y
-t/--thread 线程数 指定线程数 1 线程数不能超过系统支持的最大线程数,如果指定的线程数超过系统支持的最大线程数,则弹出警告并使用系统支持的最大线程数

备注

  1. 若执行过程中出现异常,打印异常信息并跳过当前文件,继续执行下一个文件。
  2. 支持多线程

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
import os
import hashlib
import argparse
import threading


def get_file_hash(file_path, hash_type="hash"):
"""
计算文件的哈希值。
:param file_path: 文件的路径
:param hash_type: 哈希类型,可以是 'hash'、'md5'、'sha1' 或 'sha256'
:return: 计算得到的哈希值
"""
hash_functions = {
"hash": hashlib.sha256,
"md5": hashlib.md5,
"sha1": hashlib.sha1,
"sha256": hashlib.sha256,
}
hash_func = hash_functions[hash_type]()
with open(file_path, "rb") as file:
while True:
data = file.read(65536)
if not data:
break
hash_func.update(data)
return hash_func.hexdigest()


def remove_duplicate_files_in_thread(
file_list, file_hashes, recycle, verbose, total_files, recursive, hash_type
):
"""
在线程中移除重复文件。
:param file_list: 要处理的文件列表
:param file_hashes: 存储文件哈希的字典
:param recycle: 是否删除文件到回收站
:param verbose: 是否显示详细操作信息
:param total_files: 总共需要处理的文件数量
:param recursive: 是否递归处理子目录
:param hash_type: 哈希类型
"""
processed_files = 0
count = 0
print("Beginning to remove duplicate files, please wait...")
print("Total files to be processed: " + str(total_files))
for file_path in file_list:
try:
# 获取文件哈希
file_hash = get_file_hash(file_path, hash_type)
temp = True

if recursive == 1:
if file_hash in file_hashes:
temp_hash_dirs = [
os.path.dirname(f) for f in file_hashes[file_hash]
]
else:
temp_hash_dirs = []
if os.path.dirname(file_path) not in temp_hash_dirs:
temp = False
# 如果文件哈希在字典中,说明是重复文件
if file_hash in file_hashes and temp:
# 如果选择删除文件到回收站
if recycle in ["y", "yes"]:
try:
# 尝试使用send2trash模块删除文件到回收站
import send2trash

send2trash.send2trash(file_path)
except ImportError:
# 如果send2trash模块未安装,则直接删除文件
print(
f"send2trash not installed. Deleting file {file_path} permanently."
)
os.remove(file_path)
else:
# 如果不选择删除文件到回收站,则直接删除文件
os.remove(file_path)
# 如果显示详细操作信息
if verbose:
# 更新已处理的文件数量
processed_files += 1
count += 1
# 计算剩余文件数量
remaining_files = total_files - processed_files
# 打印已处理、剩余、总文件数量和已删除的文件路径
print(
f"Processed: {processed_files}, Remaining: {remaining_files}, Deleted: {file_path}" # , Total: {total_files}
)
else:

# 如果文件哈希不在字典中,说明不是重复文件,将其添加到字典中
if file_hash not in file_hashes:
file_hashes[file_hash] = []
file_hashes[file_hash].append(file_path)
processed_files += 1
# 如果显示详细操作信息
# if verbose:
# # 更新已处理的文件数量
# processed_files += 1
# # 计算剩余文件数量
# remaining_files = total_files - processed_files
# # 打印已处理、剩余、总文件数量
# print(
# f"Processed: {processed_files}, Remaining: {remaining_files}, Total: {total_files}" # , Kept: {file_path}
# )
except Exception as e:
# 如果处理文件时发生错误,打印错误信息
print(f"Error processing file {file_path}: {e}")
print("Processed complete, " + str(count) + " files deleted")


def remove_duplicate_files(
directory,
recursive=0,
verbose=False,
hash_type="hash",
exclude=["png", "jpg"],
recycle="y",
threads=1,
):
"""
移除指定目录下的重复文件。
:param directory: 要操作的目录
:param recursive: 递归模式,0 表示不递归,1 表示递归搜索子目录,子目录中执行相同操作,2 表示递归搜索子目录,将子目录中的文件与父目录中的文件进行比较
:param verbose: 是否显示详细操作信息
:param hash_type: 比较文件的方式
:param exclude: 要排除的文件后缀列表
:param recycle: 是否删除文件到回收站,'y' 或 'yes' 表示是,'no' 或 'n' 表示否
:param threads: 线程数
"""
file_hashes = {}
file_list = []
total_files = 0
for root, dirs, files in os.walk(directory):
if recursive == 0:
dirs[:] = [] # 不继续递归子目录
for file in files:
file_ext = os.path.splitext(file)[1][1:].lower()
if file_ext not in exclude:
continue
file_path = os.path.join(root, file)
file_list.append(file_path)
total_files += 1

# 按文件名长度排序文件列表
file_list.sort(key=lambda x: len(os.path.basename(x)))

# 限制线程数不超过系统支持的最大线程数
import multiprocessing

threads = min(threads, multiprocessing.cpu_count())
if threads == 0:
threads = 1
file_lists = [[] for _ in range(threads)]
for i, file_path in enumerate(file_list):
file_lists[i % threads].append(file_path)

threads_list = []
for sub_list in file_lists:
thread = threading.Thread(
target=remove_duplicate_files_in_thread,
args=(
sub_list,
file_hashes,
recycle,
verbose,
total_files,
recursive,
hash_type,
),
)
threads_list.append(thread)
thread.start()
for thread in threads_list:
thread.join()


def main():
"""
主函数,解析命令行参数并调用 remove_duplicate_files 函数。
"""
parser = argparse.ArgumentParser(
description="Remove duplicate files from a directory."
)
parser.add_argument(
"-d",
"--dir",
default=os.getcwd(),
help="Directory to remove duplicate files from. Default is the current working directory.",
)
parser.add_argument(
"-r",
"--recursive",
type=int,
choices=[0, 1, 2],
default=0,
help="Recursive mode. 0: Do not recurse, only process the current directory; 1: Recursive search subdirectories and perform the same operation; 2: Recursive search subdirectories and compare files in subdirectories with those in the parent directory.",
)
parser.add_argument(
"-v", "--verbose", action="store_true", help="Show verbose information."
)
parser.add_argument(
"-V",
"--version",
action="version",
version="1.0",
help="Show version information.",
)
parser.add_argument(
"-c",
"--comparation",
choices=["hash", "md5", "sha1", "sha256"],
default="hash",
help="Method to compare files.",
)
parser.add_argument(
"-e",
"--exclude",
nargs="+",
default=["png", "jpg"],
help="File extensions to exclude.",
)
parser.add_argument(
"-re",
"--recycle",
choices=["y", "yes", "n", "no"],
default="y",
help="Whether to delete files to the recycle bin.",
)
parser.add_argument(
"-t", "--thread", type=int, default=1, help="Number of threads."
)
args = parser.parse_args()

args.exclude = args.exclude[0].split(",")

remove_duplicate_files(
args.dir,
args.recursive,
args.verbose,
args.comparation,
args.exclude,
args.recycle,
args.thread,
)


if __name__ == "__main__":
main()

bf(bit_filp)

诶!我有一个想法,既然文件等数据在计算机中均以二进制的01进行存储,如果将一个压缩文件中的所有数据进行比特翻转(或者位翻转),那么是不是就完成了一次对原文件的简单加密,只需对处理后的数据再进行一次翻转就得到了原文件。

主要功能描述

读取指定文件,将文件中的所有数据进行比特翻转,然后将翻转后的数据写入到新文件中。

实现流程描述

  1. 以指定块大小读取指定目录路径的文件
  2. 对读取的数据进行所有位翻转处理
  3. 处理完毕后在指定目录中写入处理后的文件,文件的后缀保持不变,处理后的文件统统加上"after_bit_flip"前缀(如有同名文件直接覆盖)

使用说明

此工具使用python编写,调用方式为:

1
python bf.py [-h] [-d DIR] [-od OUTDIR] [-r] [-v] [-V] [-t THREAD] [-bs BLOCK_SIZE]

接受的可选参数及其释义如下表所示:

参数 可选值 释义 默认值 备注
-d/--dir 需要处理的文件的路径 指定需处理文件的路径 可处理多个路径,多个路径由’,'分隔,可支持模糊匹配(或正则表达式),必须指定至少一个路径,当某路径无法访问时抛出警告后继续执行下一条
-od/--outdir 处理后文件的输出路径 指定处理后文件的输出路径 当前工作路径 运行时首先检查路径是否存在,如输入的路径不存在,抛出警告并尝试创建路径,如果创建失败抛出警告并终止程序
-r/--recursive 递归搜索子目录 当且仅当输入的路径中包含模糊匹配(或正则表达式)语句时此参数才生效
-v/--verbose 显示处理过程的详细信息 即所有需要处理的文件数量,已处理文件数量,剩余需要处理的文件数量,当前处理的文件的处理进度(此条以百分比的形式表示)
-V/--version 显示版本信息
-h/--help 打印详细的帮助信息
-t/--thread 线程数 指定线程数 1 线程数不能超过系统支持的最大线程数,如果指定的线程数超过系统支持的最大线程数,则弹出警告并使用系统支持的最大线程数
-bs/block-size 块大小 指定块大小 4MB 块大小不能超过 1GB,如果指定的块大小超过 1GB,则弹出警告并使用 1GB;块大小不能小于 1MB,若小于 1MB 就抛出警告并使用 1MB,仅支持识别单位 M,MB,GB,G 或无单位(无单位默认为 MB)其余不支持

备注

  1. 执行过程中出现异常,打印异常信息并跳过当前文件,继续执行下一个文件
  2. 支持多线程

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
import os
import sys
import re
import concurrent.futures
import warnings

# 版本信息
VERSION = "1.0"

# 作者信息
AUTHOR = "HatsuSakuraMiKu"


# 处理块大小转换
def parse_block_size(block_size_str):
if block_size_str.isdigit():
size = int(block_size_str) * 1024 * 1024 # 默认单位为 MB
elif block_size_str.endswith(("M", "MB")):
# 直接使用 replace 方法去除单位
size = int(block_size_str.replace("M", "").replace("B", "")) * 1024 * 1024
elif block_size_str.endswith(("G", "GB")):
size = (
int(block_size_str.replace("G", "").replace("B", "")) * 1024 * 1024 * 1024
)
else:
warnings.warn(f"不支持的块大小单位: {block_size_str},使用默认值 4MB")
size = 4 * 1024 * 1024
if size < 1024 * 1024:
warnings.warn(f"块大小不能小于 1MB,使用 1MB")
size = 1024 * 1024
elif size > 1024 * 1024 * 1024:
warnings.warn(f"块大小不能超过 1GB,使用 1GB")
size = 1024 * 1024 * 1024
return size


# 查找所有匹配的文件
def find_files(paths, recursive=False):
all_files = []
for path in paths:
if "*" in path or "?" in path:
if recursive:
base_dir = os.path.dirname(path) if os.path.dirname(path) else "."
pattern = os.path.basename(path)
for root, dirs, files in os.walk(base_dir):
for file in files:
if re.fullmatch(
pattern.replace(".", r"\.")
.replace("*", ".*")
.replace("?", "."),
file,
):
all_files.append(os.path.join(root, file))
else:
dir_path = os.path.dirname(path) if os.path.dirname(path) else "."
pattern = os.path.basename(path)
for file in os.listdir(dir_path):
if re.fullmatch(
pattern.replace(".", r"\.")
.replace("*", ".*")
.replace("?", "."),
file,
):
all_files.append(os.path.join(dir_path, file))
else:
if os.path.isfile(path):
all_files.append(path)
else:
print(f"路径 {path} 无法访问,跳过")
return all_files


# 检查并创建输出目录
def check_and_create_outdir(outdir):
if not os.path.exists(outdir):
try:
os.makedirs(outdir)
except OSError as e:
print(f"无法创建输出目录 {outdir}: {e},终止程序")
sys.exit(1)


# 比特翻转处理块
def bit_flip_block(block):
return bytes(~byte & 0xFF for byte in block)


# 处理单个文件
def process_file(input_file_path, output_file_path, block_size, verbose=False):
try:
file_size = os.path.getsize(input_file_path)
with open(input_file_path, "rb") as input_file, open(
output_file_path, "wb"
) as output_file:
processed_size = 0
while True:
block = input_file.read(block_size)
if not block:
break
flipped_block = bit_flip_block(block)
output_file.write(flipped_block)
processed_size += len(block)
if verbose:
progress = processed_size / file_size * 100
print(
f"\r正在处理 {input_file_path}: {progress:.2f}%",
end="",
flush=True,
)
if verbose:
print()
except Exception as e:
print(f"处理文件 {input_file_path} 时出错: {e},跳过")


# 主函数
def main():
import argparse

# 添加详细的描述和使用示例
parser = argparse.ArgumentParser(
description="**********文件比特翻转处理程序**********\n该程序用于对指定路径下的文件进行比特翻转处理。处理后的文件会加上 'after_bit_flip' 前缀并保存到指定输出路径。",
epilog="使用示例:\n"
" python bit_flip.py -d ./test.rar\n"
" python bit_flip.py -d ./\*.rar -r -v -bs 16 -t 4 -od ./output\n",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"-d",
"--dir",
help="需要处理的文件的路径,多个路径由','分隔,可支持模糊匹配",
)
parser.add_argument(
"-od", "--outdir", default=os.getcwd(), help="处理后文件的输出路径"
)
parser.add_argument("-r", "--recursive", action="store_true", help="递归搜索子目录")
parser.add_argument(
"-v", "--verbose", action="store_true", help="显示处理过程的详细信息"
)
parser.add_argument("-V", "--version", action="store_true", help="显示版本信息")
parser.add_argument("-t", "--thread", type=int, default=1, help="指定线程数")
parser.add_argument(
"-bs",
"--block-size",
default="4MB",
help="指定块大小,支持单位 M,MB,GB,G 或无单位(默认 MB)",
)
args = parser.parse_args()

if args.version:
print(f"版本号: {VERSION}\n 作者: {AUTHOR}")
return

# 解析块大小
block_size = parse_block_size(args.block_size)

# 检查并创建输出目录
check_and_create_outdir(args.outdir)

# 查找所有需要处理的文件
if args.dir is None:
print("请指定需要处理的文件路径")
sys.exit(1)
input_paths = args.dir.split(",")
all_files = find_files(input_paths, args.recursive)

if args.verbose:
print(f"需要处理的文件数量: {len(all_files)}")

# 处理文件
processed_count = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=args.thread) as executor:
futures = []
for input_file in all_files:
file_name = os.path.basename(input_file)
output_file = os.path.join(args.outdir, f"after_bit_flip_{file_name}")
future = executor.submit(
process_file, input_file, output_file, block_size, args.verbose
)
futures.append(future)

for future in concurrent.futures.as_completed(futures):
processed_count += 1
if args.verbose:
remaining = len(all_files) - processed_count
print(
f"已处理文件数量: {processed_count},剩余需要处理的文件数量: {remaining}"
)


if __name__ == "__main__":
main()


get_book_info.py

tg上有那种分享book的频道,通常是发送book的一些元数据如titleauthor等和book的在线预览链接和原始链接,像这样alt text
有些频道会发送book的压缩包,有些不会。正好tg支持把聊天记录导出为html文件,所以我们可以通过解析html文件来获取book的元数据和链接,得到链接后使用通过一些方法抓取book图片,然后将图片打包为cbz格式,并使用解析得到的元数据作为cbz的元数据。

元数据的格式如下:

1
2
3
4
5
6
7
8
9
10
11
12
book_info = {
"name": [], # 名称
"language": [], # 语言
"artist": [], # 艺术家
"artists": [], # 团队
"characters": [], # 角色
"original": [], # 原作
"tags": {"女性": [], "男性": [], "混合": [], "其他": []}, # 标签
"links": [{"预览"""},{"原始地址": ""}], # 链接
"other": {}, # 其他信息
"exID": "", # 存储到数据库时的唯一标识符,从"原始链接提取"
}

使用方法

  1. tg上,打开需要导出聊天记录的聊天,点击右上角的三个点,选择Export Chat,选择HTML格式,然后点击Export按钮,将导出的html文件放在脚本所在的目录下。
  2. 运行get_book_info.py,会在当前目录下生成book_info.json,里面包含了book的元数据和链接。此外还会生成all_book_info.db数据库,里面包含了所有book的元数据和链接。
  3. 使用 from_output_get_links.py解析output.json文件,得到所有book的链接,并将其们保存到links.txt文件中。
  4. 使用 TGIDownloader_EN或者其他方法批量下载book图片,将图片保存到本地。
  5. 自行打包图片为cbz格式,并使用解析得到的元数据作为cbz的元数据。

代码

  1. get_book_info.py

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    221
    222
    223
    224
    225
    226
    227
    228
    229
    230
    231
    232
    233
    234
    235
    236
    237
    238
    239
    240
    241
    242
    243
    244
    245
    246
    247
    248
    249
    250
    251
    252
    253
    254
    255
    256
    257
    258
    259
    260
    261
    262
    263
    264
    265
    266
    267
    268
    269
    270
    271
    272
    273
    274
    275
    276
    277
    278
    279
    280
    281
    282
    283
    284
    285
    286
    287
    288
    289
    290
    291
    292
    293
    294
    295
    296
    297
    298
    299
    300
    301
    302
    303
    304
    305
    306
    307
    308
    309
    310
    311
    312
    313
    314
    315
    316
    317
    318
    319
    320
    321
    322
    323
    324
    325
    326
    327
    328
    329
    330
    331
    332
    333
    334
    335
    336
    337
    338
    339
    340
    341
    342
    343
    344
    345
    346
    347
    348
    349
    350
    351
    352
    353
    354
    355
    356
    357
    358
    359
    360
    361
    362
    363
    364
    365
    366
    367
    368
    369
    370
    371
    372
    373
    374
    375
    376
    377
    378
    379
    380
    381
    382
    383
    384
    385
    386
    387
    388
    389
    390
    391
    392
    393
    394
    395
    396
    397
    398
    399
    400
    401
    402
    403
    404
    405
    406
    407
    408
    409
    410
    411
    412
    413
    414
    415
    416
    417
    418
    419
    420
    421
    422
    423
    424
    425
    426
    427
    428
    429
    430
    431
    432
    433
    434
    435
    436
    437
    438
    439
    440
    441
    442
    443
    444
    445
    446
    447
    448
    449
    450
    import os
    import re
    import json
    from bs4 import BeautifulSoup
    import sqlite3

    """
    简要文档:https://hsmk.mikufans.date/posts/9a7cb586.html
    """


    def extract_tag_info(text_div):
    tag_info_list = []
    code_tags = text_div.find_all("code")
    for code_tag in code_tags:
    label = code_tag.get_text().strip()
    if (
    code_tag.find_next_sibling() == None
    or code_tag.find_next_sibling().name == "br"
    ):
    next_sibling = code_tag.next_sibling
    else:
    next_sibling = code_tag.find_next_sibling()
    extracted_value = extract_value_from_sibling(next_sibling)
    if extracted_value is not None:
    tag_info_list.append({label: extracted_value})
    return tag_info_list


    def extract_value_from_sibling(next_sibling):

    # 如果next_sibling存在且其标签名为"a",并且包含"onclick"属性,则执行以下操作
    if next_sibling and next_sibling.name == "a" and "onclick" in next_sibling.attrs:
    hashtags = []

    # 当next_sibling存在且其标签名为"a"时,执行以下操作
    while next_sibling and next_sibling.name == "a":

    # 获取next_sibling的"onclick"属性值
    onclick = next_sibling.get("onclick")

    # 如果"onclick"属性值存在,则执行以下操作
    if onclick:

    # 使用正则表达式匹配"onclick"属性值中的"ShowHashtag"函数参数
    match = re.search(r'return ShowHashtag\("(.*?)"\)', onclick)

    # 如果匹配成功,则将匹配结果添加到hashtags列表中
    if match:
    hashtags.append(match.group(1))

    # 如果next_sibling的下一个兄弟节点标签名为"a",且next_sibling的下一个兄弟节点的下一个兄弟节点不存在,则将next_sibling设置为next_sibling的下一个兄弟节点
    if (
    next_sibling.find_next_sibling().name == "a"
    and next_sibling.next_sibling.name == None
    ):
    next_sibling = next_sibling.find_next_sibling()
    else:
    # 否则,将next_sibling设置为next_sibling的下一个兄弟节点
    next_sibling = next_sibling.next_sibling

    # 如果next_sibling存在且其标签名为None,则将hashtags列表中最后一个元素与next_sibling的文本内容拼接
    if next_sibling and next_sibling.name == None:
    hashtags[-1] += next_sibling.get_text().strip()
    # 返回hashtags列表
    return hashtags
    # 如果next_sibling存在且其标签名为"a",并且包含"href"属性,则执行以下操作
    elif next_sibling and next_sibling.name == "a" and "href" in next_sibling.attrs:
    hashtags = []

    # 如果next_sibling的标签名为"a",则执行以下操作
    if next_sibling.name == "a":
    # 获取next_sibling的"href"属性值

    href = next_sibling.get("href")

    # 如果"href"属性值存在,则将"href"属性值添加到hashtags列表中
    if href:

    hashtags.append(href)
    value = (
    next_sibling.strip()
    if isinstance(next_sibling, str)
    else next_sibling.get_text().strip()
    )
    hashtags.append(value)
    return hashtags

    elif next_sibling:
    hashtags = []
    while next_sibling and next_sibling.name != "code":
    value = (
    (
    next_sibling.strip()
    if isinstance(next_sibling, str)
    else next_sibling.get_text().strip()
    )
    .replace(":", "")
    .strip()
    )
    if value:
    hashtags.append(value)
    if next_sibling.next_sibling:
    next_sibling = next_sibling.next_sibling
    else:
    next_sibling = next_sibling.find_next_sibling()
    return hashtags

    return None


    def fill_book_info(tag_info_list):
    book_info = {
    "name": [], # 名称
    "language": [], # 语言
    "artist": [], # 艺术家
    "artists": [], # 团队
    "characters": [], # 角色
    "original": [], # 原作
    "tags": {"女性": [], "男性": [], "混合": [], "其他": []}, # 标签
    "links": [], # 链接
    "other": {}, # 其他信息
    "exID": "", # 存储到数据库时的唯一标识符
    }
    for tag_info in tag_info_list:
    for label, value in tag_info.items():
    if label == "语言":
    book_info["language"] = value
    elif label == "角色":
    book_info["characters"] = value
    elif label == "女性":
    book_info["tags"]["女性"] = value
    elif label == "男性":
    book_info["tags"]["男性"] = value
    elif label == "混合":
    book_info["tags"]["混合"] = value
    elif label == "艺术家":
    book_info["artist"] = value
    elif label == "团队":
    book_info["artists"] = value
    elif label in ["其他", "other"]:
    book_info["tags"]["其他"] = value
    elif label == "原作":
    book_info["original"] = value
    elif label == "预览":
    if len(value) == 2:
    url, text = value
    name_part = url.replace("https://telegra.ph/", "")
    book_info["name"].append(name_part)
    book_info["name"].append(text)
    book_info["links"].append({"预览": url})
    elif label == "原始地址":
    book_info["links"].append({"原始地址": value[0]})
    if value[0]:
    match = re.search(r"/g/(\d+/[a-z0-9]+)/", value[0])
    if match:
    book_info["exID"] = match.group(1)
    else:
    book_info["other"][label] = value

    return book_info


    def extract_info(html_content):
    soup = BeautifulSoup(html_content, "html.parser")
    message_divs = soup.find_all(
    "div", class_=lambda x: x and ("message default clearfix" in x)
    )
    all_book_info = []
    links_key_set = []

    for message_div in message_divs:
    body_div = message_div.find("div", class_="body")
    if not body_div:
    continue
    text_div = body_div.find("div", class_="text")
    if not text_div:
    continue

    if not text_div.find(
    "code", string=lambda s: "语言" in s.strip() if s else False
    ):
    continue

    tag_info_list = extract_tag_info(text_div)
    book_info = fill_book_info(tag_info_list)
    if book_info["exID"] not in links_key_set:
    links_key_set.append(book_info["exID"])
    all_book_info.append(book_info)

    return all_book_info


    def process_html_files(folder_path):
    all_book_info = []
    for filename in os.listdir(folder_path):
    if filename.endswith(".html"):
    file_path = os.path.join(folder_path, filename)
    # try:
    with open(file_path, "r", encoding="utf-8") as file:
    html_content = file.read()
    info_list = extract_info(html_content)
    all_book_info.extend(info_list)
    # except Exception as e:
    # print(f"处理文件 {filename} 时出错: {e}")
    return all_book_info


    def save_to_json(data, output_file):
    with open(output_file, "w", encoding="utf-8") as file:
    json.dump(data, file, ensure_ascii=False, indent=4)


    def create_database(db_name):
    conn = sqlite3.connect(db_name)
    cursor = conn.cursor()
    cursor.execute(
    """
    CREATE TABLE IF NOT EXISTS Books (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    exID TEXT UNIQUE,
    name TEXT,
    language TEXT,
    artist TEXT,
    artists TEXT,
    characters TEXT,
    original TEXT,
    other TEXT
    )
    """
    )
    cursor.execute(
    """
    CREATE TABLE IF NOT EXISTS Tags (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    book_id INTEGER,
    category TEXT,
    tag TEXT,
    FOREIGN KEY (book_id) REFERENCES Books (id)
    )
    """
    )
    cursor.execute(
    """
    CREATE TABLE IF NOT EXISTS Links (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    book_id INTEGER,
    link_type TEXT,
    link TEXT,
    FOREIGN KEY (book_id) REFERENCES Books (id)
    )
    """
    )
    conn.commit()
    return conn


    def save_to_litesql(all_book_info, db_path="books.db"):

    conn = create_database(db_path) # 连接到已存在的数据库
    cursor = conn.cursor()

    for book_info in all_book_info:
    # 查询是否已存在
    cursor.execute("SELECT * FROM Books WHERE exID = ?", (book_info["exID"],))
    existing_book = cursor.fetchone()

    if existing_book:
    continue # 如果已存在,则跳过
    # # 如果已存在,检查并更新
    # existing_book_info = {
    # "name": existing_book[2].split(", ") if existing_book[2] else [],
    # "language": existing_book[3].split(", ") if existing_book[3] else [],

    # "artist": existing_book[4].split(", ") if existing_book[4] else [],
    # "artists": existing_book[5].split(", ") if existing_book[5] else [],
    # "characters": existing_book[6].split(", ") if existing_book[6] else [],
    # "original": existing_book[7].split(", ") if existing_book[7] else [],
    # "other": (
    # json.loads(existing_book[8]) if existing_book[8] else {}
    # ), # 将 JSON 字符串转换为字典
    # }

    # # 检查并合并数据
    # if book_info["name"]:
    # existing_book_info["name"] = list(
    # set(existing_book_info["name"] + book_info["name"])
    # )
    # if book_info["language"]:
    # existing_book_info["language"] = list(
    # set(existing_book_info["language"] + book_info["language"])
    # )
    # if book_info["artist"]:
    # existing_book_info["artist"] = list(
    # set(existing_book_info["artist"] + book_info["artist"])
    # )
    # if book_info["artists"]:
    # existing_book_info["artists"] = list(
    # set(existing_book_info["artists"] + book_info["artists"])
    # )
    # if book_info["characters"]:
    # existing_book_info["characters"] = list(
    # set(existing_book_info["characters"] + book_info["characters"])
    # )
    # if book_info["original"]:
    # existing_book_info["original"] = book_info["original"] # 可能需要覆盖
    # existing_book_info["other"].update(book_info["other"]) # 合并字典

    # # 更新数据库
    # cursor.execute(
    # """
    # UPDATE Books SET
    # name = ?,
    # language = ?,
    # artist = ?,
    # artists = ?,
    # characters = ?,
    # original = ?,
    # other = ?
    # WHERE exID = ?
    # """,
    # (
    # ", ".join(existing_book_info["name"]),
    # ", ".join(existing_book_info["language"]),
    # ", ".join(existing_book_info["artist"]),
    # ", ".join(existing_book_info["artists"]),
    # ", ".join(existing_book_info["characters"]),
    # ", ".join(existing_book_info["original"]),
    # json.dumps(existing_book_info["other"]),
    # book_info["exID"],
    # ),
    # )
    else:
    # 插入新记录
    # existing_book_info = {} # 为 existing_book_info 赋默认值
    cursor.execute(
    """
    INSERT INTO Books (exID, name, language, artist, artists, characters, original, other)
    VALUES (?, ?, ?, ?, ?, ?, ?, ?)
    """,
    (
    book_info["exID"],
    ", ".join([name.replace(", ", ",") for name in book_info["name"]]),
    ", ".join(book_info["language"]),
    ", ".join(book_info["artist"]),
    ", ".join(book_info["artists"]),
    ", ".join(book_info["characters"]),
    ", ".join(book_info["original"]),
    json.dumps(book_info["other"]), # 将字典转换为 JSON 字符串
    ),
    )

    # 获取插入后的 book_id
    cursor.execute("SELECT id FROM Books WHERE exID = ?", (book_info["exID"],))
    book_id = cursor.fetchone()[0]

    # 插入 Tags 表
    for category, tags in book_info["tags"].items():
    for tag in tags:
    cursor.execute(
    """
    INSERT INTO Tags (book_id, category, tag)
    VALUES (?, ?, ?)
    """,
    (book_id, category, tag),
    )

    # 插入 Links 表
    for link in book_info["links"]:
    for link_type, link_value in link.items():
    cursor.execute(
    """
    INSERT INTO Links (book_id, link_type, link)
    VALUES (?, ?, ?)
    """,
    (book_id, link_type, link_value),
    )

    conn.commit()
    conn.close() # 关闭数据库连接


    def read_from_litesql(db_path="books.db"):
    # 连接到数据库
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    # 查询 Books 表中的所有书籍
    cursor.execute("SELECT * FROM Books")
    books = cursor.fetchall()

    all_book_info = []

    for book in books:
    book_info = {
    "exID": book[1],
    "name": (
    [name.replace(",", ", ") for name in book[2].split(", ")]
    if book[2]
    else []
    ),
    "language": book[3].split(", ") if book[3] else [],
    "artist": book[4].split(", ") if book[4] else [],
    "artists": book[5].split(", ") if book[5] else [],
    "characters": book[6].split(", ") if book[6] else [],
    "original": book[7].split(", ") if book[7] else [],
    "other": json.loads(book[8]) if book[8] else {},
    "tags": {},
    "links": [],
    }

    # 查询 Tags 表中与当前书籍相关的标签
    cursor.execute("SELECT category, tag FROM Tags WHERE book_id = ?", (book[0],))
    tags = cursor.fetchall()
    for category, tag in tags:
    if category not in book_info["tags"]:
    book_info["tags"][category] = []
    book_info["tags"][category].append(tag)

    # 查询 Links 表中与当前书籍相关的链接
    cursor.execute(
    "SELECT link_type, link FROM Links WHERE book_id = ?", (book[0],)
    )
    links = cursor.fetchall()
    for link_type, link_value in links:
    book_info["links"].append({link_type: link_value})

    all_book_info.append(book_info)

    conn.close() # 关闭数据库连接
    return all_book_info


    def main():
    folder_path = "./" # 替换为实际的 HTML 文件所在文件夹路径
    output_file = "book_info.json"
    all_book_info_json = "E:\\H_book_info\\all_book_info.json"
    database_path = "E:\\H_book_info\\all_book_info.db"

    book_info_list = process_html_files(folder_path) # 获取书籍信息列表
    save_to_json(book_info_list, output_file) # 保存书籍信息到 JSON 文件
    save_to_litesql(book_info_list, database_path) # 保存书籍信息到 SQLite 数据库
    if all_book_info_json:
    book_info_list = read_from_litesql(
    database_path
    ) # 从 SQLite 数据库读取书籍信息
    save_to_json(book_info_list, all_book_info_json) # 保存书籍信息到 JSON 文件

    if __name__ == "__main__":
    main()
  2. from_output_get_links.py

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    import json


    def extract_preview_links():
    # 读取JSON文件
    try:
    with open("output.json", "r", encoding="utf-8") as json_file:
    data = json.load(json_file)
    except FileNotFoundError:
    print("output.json文件未找到,请检查文件路径。")
    return
    except json.JSONDecodeError:
    print("output.json文件格式错误,请检查JSON格式。")
    return

    preview_links = []
    # 遍历JSON数据中的每个对象
    for item in data:
    # 获取links列表
    links = item.get("links", [])
    for link in links:
    # 检查是否有预览链接
    if "预览" in link:
    preview_links.append(link["预览"])

    # 将预览链接写入link.txt文件
    with open("link.txt", "w", encoding="utf-8") as txt_file:
    for link in preview_links:
    txt_file.write(link + "\n")

    print(f"共找到 {len(preview_links)} 个预览链接,已写入link.txt文件。")


    if __name__ == "__main__":
    extract_preview_links()