md_files/Java/草稿.md

1232 lines
42 KiB
Markdown
Raw Normal View History

产品官网:[智标领航 - 招投标AI解决方案](https://intellibid.cn/home)
产品后台https://intellibid.cn:9091/login?redirect=%2Findex
项目地址:[zy123/zbparse - zbparse - 智标领航代码仓库](http://47.98.59.178:3000/zy123/zbparse)
git clone地址http://47.98.59.178:3000/zy123/zbparse.git
选择develop分支develop-xx 后面的xx越近越新。
正式环境121.41.119.164:5000
测试环境47.98.58.178:5000
大解析指从招标文件解析入口进去upload.py
小解析从投标文件生成入口进去little_zbparse 和get_deviation两个接口后端一起调
## 项目启动与维护:
![1](https://pic.bitday.top/i/2025/03/24/oto7o6-0.png)
.env存放一些密钥大模型、textin等它是gitignore忽略了因此在服务器上git pull项目的时候这个文件不会更新因为密钥比较重要需要手动维护服务器相应位置的.env。
### **如何更新服务器上的版本:**
#### 步骤
1. 进入项目文件夹
![1](https://pic.bitday.top/i/2025/03/24/oto3eb-0.png)
**注意:**需要确认.env是否存在在服务器默认是隐藏的
输入cat .env
如果不存在在项目文件夹下sudo vim .env
将密钥粘贴进去!!!
2. git pull
3. sudo docker-compose up --build -d 更新并重启
或者 sudo docker-compose build 先构建镜像
sudo docker-compose up -d 等空间时再重启
4. sudo docker-compose logs flask_app --since 1h 查看最近1h的日志如果重启后报错也能查看推荐重启后都运行一下这个
requirements.txt一般无需变动除非代码中使用了新的库也要手动在该文件中添加包名及对应的版本
#### docker基础知识
**docker-compose:**
![1](https://pic.bitday.top/i/2025/03/24/oto6gb-0.png)
本项目为**单服务项目**只有flask_app(服务名)
build context`context: .`
这是在构建镜像时提供给 Docker 的文件集,指明哪些文件可以被 Dockerfile 中的 `COPY``ADD` 指令使用。它是构建过程中的“资源包”。
对于多服务build下就要针对不同的服务指定所需的“资源包”和对应的Dockerfile
**dockerfile:**
![1](https://pic.bitday.top/i/2025/03/24/oto47x-0.png)
COPY . .(在 Dockerfile 中):
这条指令会将构建上下文中的所有内容复制到镜像中的当前工作目录(这里是 `/flask_project`)。
```text
docker exec -it zbparse-flask_app-1 sh
```
这个命令会直接进入到flask_project目录内部ls之后可以看到
```text
Dockerfile README.md docker-compose.yml flask_app md_files requirements.txt
```
如果这个基础上再` cd / `会切换到这个容器的根目录可以看到flask_project文件夹以及其他基础系统环境。如
```text
bin boot dev etc flask_project home lib lib64 media mnt opt proc root run sbin srv sys tmp usr var
```
**数据卷挂载:**
volumes:
-/home/Z/zbparse_output_dev:/flask_project/flask_app/static/output *#* *额外的数据卷挂载*
本地路径:容器内路径 都从根目录找起。
**完整的容器名**
```text
<项目名>-<服务名>-<序号>
```
**项目名**:默认是当前目录的名称(这里是 `zbparse`),或者你在启动 Docker Compose 时通过 `-p` 参数指定的项目名称。
**服务名**:在 `docker-compose.yml` 文件中定义的服务名称(这里是 `flask_app`)。
**序号**:如果同一个服务启动了多个容器,会有数字序号来区分(这里是 `1`)。
docker-compose exec flask_app sh
docker exec -it zbparse-flask_app-1 sh
这两个是等价的因为docker-compose 会自动找到对应的完整容器名并执行命令。
**删除所有悬空镜像**(无容器引用的 `<none>` 镜像)
```text
docker image prune
```
### **如何本地启动本项目:**
**Pycharm启动**
1. requirements.txt里的环境要配好
conda create -n zbparse python=3.8
conda activate zbparse
pip install -r requirements.txt
2. .env环境配好 一般不需要在电脑环境变量中额外配置了但是要在Pycharm中**安装插件**,使得项目在**启动时**能将env中的环境变量**自动配置**到系统环境变量中!!!)
3. 点击下拉框Edit configurations
![1](https://pic.bitday.top/i/2025/03/24/oto61f-0.png)
设置run_serve.py为启动脚本![1](https://pic.bitday.top/i/2025/03/24/io729q-2.png)
注意这里的working directory要设置到最外层文件夹而不是flask_app
**命令行启动**
1.编写ps1脚本
```text
# 切换到指定目录
cd D:\PycharmProjects\zbparse
# 激活 Conda 环境
conda activate zbparse
# 检查是否存在 .env 文件
if (Test-Path .env) {
# 读取 .env 文件并设置环境变量
Get-Content .env | ForEach-Object {
if ($_ -match '^\s*([^=]+)=(.*)') {
$name = $matches[1].Trim()
$value = $matches[2].Trim()
[System.Environment]::SetEnvironmentVariable($name, $value)
}
}
} else {
Write-Host ".env not find"
}
# 设置 PYTHONPATH 环境变量
$env:PYTHONPATH = "D:\flask_project"
# 运行 Python 脚本
python flask_app\run_serve.py
```
`$env:PYTHONPATH = "D:\flask_project"`,告诉 Python 去 D:\flask_project 查找模块,这样就能让 Python 找到你的 `flask_app` 包。
2.确保conda已添加到系统环境变量
- 打开 Anaconda Prompt然后输入 `where conda` 来查看 conda 的路径。
- ```text
打开系统环境变量Path添加一条C:\ProgramData\anaconda3\condabin
或者 CMD 中 set PATH=%PATH%;新添加的路径
```
- 重启终端可以刷新环境变量
3.如果你尚未在 PowerShell 中初始化 conda可以在 Anaconda Prompt 中运行:
```text
conda init powershell
```
4.进入到存放run.ps1文件的目录在搜索栏中输入powershell
5.默认情况下PowerShell 可能会阻止运行脚本。你可以调整执行策略:
```text
Set-ExecutionPolicy RemoteSigned -Scope CurrentUser
```
6.运行脚本
```text
.\run.ps1
```
**注意!!!**
Windows 控制台存在QuickEdit 模式,在 QuickEdit 模式下,当你在终端窗口中点击(尤其是拖动或选中内容)时,控制台会进入文本选择状态,从而暂停正在运行的程序!!
**禁用 QuickEdit 模式**
- 在 PowerShell 窗口标题栏上点击右键,选择“属性”。
- 在“选项”选项卡中,取消勾选“快速编辑模式”。
- 点击“确定”,重启 PowerShell 窗口后再试。
### 模拟用户请求
postman打**post**请求测试:
http://127.0.0.1:5000/upload
body:
{
"file_url":"xxxx",
"zb_type":2
}
file_url如何获取[OSS管理控制台](https://oss.console.aliyun.com/bucket/oss-cn-wuhan-lr/bid-assistance/object?path=test%2F)
bid-assistance/test 里面找个文件的url推荐'094定稿-湖北工业大学xxx'
注意这里的url地址有时效性要经常重新获取新的url
### 清理服务器上的文件夹
**1.编写shell文件**sudo vim clean_dir.sh
清理/home/Z/zbparse_output_dev下的output1这些二级目录下的c8d2140d-9e9a-4a49-9a30-b53ba565db56这种uuid的三级目录只保留最近7天
```text
#!/bin/bash
# 需要清理的 output 目录路径
ROOT_DIR="/home/Z/zbparse_output_dev"
# 检查目标目录是否存在
if [ ! -d "$ROOT_DIR" ]; then
echo "目录 $ROOT_DIR 不存在!"
exit 1
fi
echo "开始清理 $ROOT_DIR 下超过 7 天的目录..."
echo "以下目录将被删除:"
# -mindepth 2 表示从第二层目录开始查找,防止删除 output 下的直接子目录(如 output1、output2
# -depth 采用深度优先遍历,确保先处理子目录再处理父目录
find "$ROOT_DIR" -mindepth 2 -depth -type d -mtime +7 -print -exec rm -rf {} \;
echo "清理完成。"
```
**2.添加权限。**
```text
sudo chmod +x ./clean_dir.sh
```
**3.执行**
```text
sudo ./clean_dir.sh
```
**4.以 root 用户的身份编辑 crontab 文件**从而设置或修改系统定时任务cron jobs。每天零点10分清理
```text
sudo crontab -e
在里面添加:
10 0 * * * /home/Z/clean_dir.sh
```
**目前测试服务器和正式服务器都写上了!无需变动**
### 内存泄漏问题
#### 问题定位
**查看容器运行时占用的文件FD套接字FD等**(排查内存泄漏,长期运行这三个值不会很大)
```text
[Z@iZbp13rxxvm0y7yz7l02hbZ zbparse]$ docker exec -it zbparse-flask_app-1 sh
ls -l /proc/1/fd | awk '
BEGIN {
file=0; socket=0; pipe=0; other=0
}
{
if(/socket:/) socket++
else if(/pipe:/) pipe++
else if(/\/|tmp/) file++ # 识别文件路径特征
else other++
}
END {
print "文件FD:", file
print "套接字FD:", socket
print "管道FD:", pipe
print "其他FD:", other
}'
```
**可以发现文件FD很大基本上发送一个请求文件FD就加一且不会衰减**
经排查,@validate_and_setup_logger注解会为每次请求都创建一个logger,需要在@app.teardown_request中获取与本次请求有关的logger并释放。
```text
def create_logger(app, subfolder):
"""
创建一个唯一的 logger 和对应的输出文件夹。
参数:
subfolder (str): 子文件夹名称,如 'output1', 'output2', 'output3'
"""
unique_id = str(uuid.uuid4())
g.unique_id = unique_id
output_folder = os.path.join("flask_app", "static", "output", subfolder, unique_id)
os.makedirs(output_folder, exist_ok=True)
log_filename = "log.txt"
log_path = os.path.join(output_folder, log_filename)
logger = logging.getLogger(unique_id)
if not logger.handlers:
file_handler = logging.FileHandler(log_path)
file_formatter = CSTFormatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(logging.Formatter('%(message)s'))
logger.addHandler(stream_handler)
logger.setLevel(logging.INFO)
logger.propagate = False
g.logger = logger
g.output_folder = output_folder #输出文件夹路径
```
handler每当 logger 生成一条日志信息时,这条信息会被传递给所有关联的 handler由 handler 决定如何输出这条日志。例如,`FileHandler` 会把日志写入文件,而 `StreamHandler` 会将日志输出到控制台。
`logger.setLevel(logging.INFO)` :它设置了 logger 的日志级别阈值。Logger **只会处理大于或等于 INFO 级别**的日志消息(例如 INFO、WARNING、ERROR、CRITICAL而 DEBUG 级别的消息会被忽略。
**解决这个文件句柄问题后内存泄漏仍未解决,考虑分模块排查。**
本项目结构大致是**1.**预处理(文件读取切分) **2.**并发调用5个函数分别调用大模型获取结果。
因此排查思路:
先将**预处理模块单独拎出来**作为接口,上传文件测试。
文件一般几MB首先会读到内存再处理必然会占用很多内存且它是调用每个接口都会经历的环节(little_zbparse/upload等)
**内存泄漏排查工具**
pip install **memory_profiler**
```text
from memory_profiler import memory_usage
import time
@profile
def my_function():
a = [i for i in range(100000)]
time.sleep(1) # 模拟耗时操作
b = {i: i*i for i in range(100000)}
time.sleep(1)
return a, b
# 监控函数“运行前”和“运行后”的内存快照
mem_before = memory_usage()[0]
result=my_function()
mem_after = memory_usage()[0]
print(f"Memory before: {mem_before} MiB, Memory after: {mem_after} MiB")
```
@profile注解加在函数上,可以逐行分析内存增减情况。
memory_usage()[0] 可以获取当前程序所占内存的**快照**
![1](https://pic.bitday.top/i/2025/03/24/otpc8s-0.png)
产生的数据都存到result变量-》内存中这是正常的因此my_function没有内存泄漏问题。
**但是**
```text
@profile
def extract_text_by_page(file_path):
result = ""
with open(file_path, 'rb') as file:
reader =PdfReader(file)
num_pages = len(reader.pages)
# print(f"Total pages: {num_pages}")
for page_num in range(num_pages):
page = reader.pages[page_num]
text = page.extract_text()
return ""
```
![1](https://pic.bitday.top/i/2025/03/24/invnrj-2.png)
可以发现尽管我返回""内存仍然没有释放因为就是读取pdf这块发生了内存泄漏
**tracemalloc**
```text
def extract_text_by_page(file_path):
result = ""
with open(file_path, 'rb') as file:
reader =PdfReader(file)
num_pages = len(reader.pages)
# print(f"Total pages: {num_pages}")
for page_num in range(num_pages):
page = reader.pages[page_num]
text = page.extract_text()
return result
# 开始跟踪内存分配
tracemalloc.start()
# 捕捉函数调用前的内存快照
snapshot_before = tracemalloc.take_snapshot()
# 调用函数
file_path=r'C:\Users\Administrator\Desktop\fsdownload\00550cfc-fd33-469e-8272-9215291b175c\ztbfile.pdf'
result = extract_text_by_page(file_path)
# 捕捉函数调用后的内存快照
snapshot_after = tracemalloc.take_snapshot()
# 比较两个快照,获取内存分配差异信息
stats = snapshot_after.compare_to(snapshot_before, 'lineno')
print("[ Top 10 内存变化 ]")
for stat in stats[:10]:
print(stat)
# 停止内存分配跟踪
tracemalloc.stop()
```
![1](https://pic.bitday.top/i/2025/03/24/io6evy-2.png)
tracemalloc能更深入的分析不仅是自己写的代码**调用的库函数**产生的内存也能分析出来。在这个例子中就是PyPDF2中的各个函数占用了大部分内存。
**综上定位到问题就是读取PDF使用PyPDF2库的地方**
#### **如何解决:**
1. 首先尝试用with open打开文件代替直接使用
```text
reader =PdfReader(file_path)
```
能够确保文件正常关闭。但是没有效果。
2. 考虑为**每次请求开子进程**处理,有效**隔离内存泄漏**导致的资源占用,这样子进程运行结束后会释放资源。
3. 但是解析流程是流式/分段返回的,因此还需处理:
**_child_target** 是一个“桥梁”:
- 它在子进程内调用 `goods_bid_main(...)` (你的生成器) 并把每一次 `yield` 得到的数据放进队列。
- 结束时放一个 `None` 表示没有更多数据。
**run_in_subprocess** 是主进程使用的接口,开启子进程:
- 它启动子进程并实时 `get()` 队列数据,然后 `yield` 给外界调用者。
- 当队列里读到 `None`,说明子进程运行完毕,就 `break` 循环并 `p.join()`
**main_func**是真正执行的函数!!!
```text
def _child_target(main_func, queue, output_folder, file_path, file_type, unique_id):
"""
子进程中调用 `main_func`(它是一个生成器函数),
将其 yield 出的数据逐条放进队列,最后放一个 None 表示结束。
"""
try:
for data in main_func(output_folder, file_path, file_type, unique_id):
queue.put(data)
except Exception as e:
# 如果要把异常也传给父进程,以便父进程可感知
queue.put(json.dumps({'error': str(e)}, ensure_ascii=False))
finally:
queue.put(None)
def run_in_subprocess(main_func, output_folder, file_path, file_type, unique_id):
"""
启动子进程调用 `main_func(...)`,并在父进程流式获取其输出(通过 Queue
子进程结束时,操作系统回收其内存;父进程则保持实时输出。
"""
queue = multiprocessing.Queue()
p = multiprocessing.Process(
target=_child_target,
args=(main_func, queue, output_folder, file_path, file_type, unique_id)
)
p.start()
while True:
item = queue.get() # 阻塞等待子进程产出的数据
if item is None:
break
yield item
p.join()
```
如果开子线程,线程共享同一进程的内存空间,所以如果发生内存泄漏,泄漏的内存会累积在整个进程中,影响所有线程。
开子进程的缺点:多进程通常消耗的系统资源(如内存、启动开销)比多线程要大,因为每个进程都需要独立的资源和上下文切换开销。
**进程池**
在判断上传的文件是否为招标文件时,需要快速准确地响应。因此既保证**内存不泄漏**,又**保证速度**的方案就是在项目启动时创建进程池。(因为**创建进程需要耗时2到3秒**
如果是Waitress服务器启动这里的进程池是全局共享的但如果Gunicorn启动每个请求分配一个worker进程进程池是在worker里面共享的
```text
#创建app启动时
def create_app():
# 创建全局日志记录器
app = Flask(__name__)
app.process_pool = Pool(processes=10, maxtasksperchild=3)
app.global_logger = create_logger_main('model_log') # 全局日志记录器
#调用时
pool = current_app.process_pool # 使用全局的进程池
def judge_zbfile_exec_sub(file_path):
result = pool.apply(
judge_zbfile_exec, # 你的实际执行函数
args=(file_path,)
)
return result
```
但是存在一个问题:**第一次发送请求执行时间较慢!**
![1](https://pic.bitday.top/i/2025/03/24/inwuya-2.png)
可以发现实际执行只需7.7s但是接口实际耗时10.23秒,主要是因**懒加载或按需初始化**:有些模块或资源在子进程启动时并不会马上加载,而是在子进程首次真正执行任务时才进行初始化。
**解决思路提前热身warm up进程池**
在应用启动后、还没正式接受请求之前,可以提交一个简单的“空任务”或非常小的任务给进程池,让子进程先**完成相关的初始化**。这种“预热”方式能在正式请求到来之前就完成大部分初始化,减少首次请求的延迟。
**还可以快速验证服务是否正常启动**
```text
def warmup_request():
# 等待服务器完全启动,例如等待 1-2 秒
time.sleep(5)
try:
url = "http://127.0.0.1:5000/judge_zbfile"
#url必须为永久地址,完成热启动,创建进程池
payload = {"file_url": "xxx"} # 根据实际情况设置 file_url
headers = {"Content-Type": "application/json"}
response = requests.post(url, json=payload, headers=headers)
print(f"Warm-up 请求发送成功,状态码:{response.status_code}")
except Exception as e:
print(f"Warm-up 请求出错:{e}")
```
threading.Thread(target=warmup_request, daemon=True).start()
## flask_app结构介绍
<img src="https://pic.bitday.top/i/2025/03/24/io7n4q-2.png" alt="1" style="zoom:67%;" />
### 项目中做限制的地方
#### **账号、服务器分流**
服务器分流目前linux服务器和windows服务器主要是硬件上的分流文件切分需要消耗CPU资源大模型基底还是调用阿里共用的tpm qpm。
账号分流qianwen_plus下的
```text
api_keys = cycle([
os.getenv("DASHSCOPE_API_KEY"),
# os.getenv("DASHSCOPE_API_KEY_BACKUP1"),
# os.getenv("DASHSCOPE_API_KEY_BACKUP2")
])
api_keys_lock = threading.Lock()
def get_next_api_key():
with api_keys_lock:
return next(api_keys)
api_key = get_next_api_key()
```
只需轮流使用不同的api_key即可。目前没有启用。
#### **大模型的限制**
general/llm下的doubao.py 和通义千问long_plus.py
**目前是linux和windows各部署一套因此项目中的qps是对半的即calls=?**
1. 这是qianwen-long的限制针对阿里qpm为1200每秒就是20又linux和windows服务器对半就是10TPM无上限
```text
@sleep_and_retry
@limits(calls=10, period=1) # 每秒最多调用10次
def rate_limiter():
pass # 这个函数本身不执行任何操作,只用于限流
```
2. 这是qianwen-plus的限制针对tpm为1000万每个请求2万tokens那么linux和windows总的qps为8时8x60x2=960<1000单个为4
**经过2.11号测试calls=4时最高TPM为800因此把目前稳定版把calls设为5**
**2.12,用turbo作为超限后的承载目前把calls设为7**
```text
@sleep_and_retry
@limits(calls=7, period=1) # 每秒最多调用7次
def qianwen_plus(user_query, need_extra=False):
logger = logging.getLogger('model_log') # 通过日志名字获取记录器
```
3. qianwen_turbo的限制TPM为500万由于它是plus后的手段稳妥一点qps设为6两个服务器分流即calls=3
```text
@sleep_and_retry
@limits(calls=3, period=1) # 500万tpm每秒最多调用6次两个服务器分流就是3次 plus超限后的保底手段稳妥一点
```
**重点!!**后续阿里扩容之后成倍修改这块**calls=?**
如果不用linux和windows负载均衡这里的calls也要乘2
#### **接口的限制**
1. start_up.py的def create_app()函数限制了对每个接口同时100次请求。这里事实上不再限制了因为100已经足够大了默认限制做到大模型限制这块。
```text
app.connection_limiters['upload'] = ConnectionLimiter(max_connections=100)
app.connection_limiters['get_deviation'] = ConnectionLimiter(max_connections=100)
app.connection_limiters['default'] = ConnectionLimiter(max_connections=100)
app.connection_limiters['judge_zbfile'] = ConnectionLimiter(max_connections=100)
```
2. ConnectionLimiter.py以及每个接口上的装饰器
```text
@require_connection_limit(timeout=1800)
def zbparse():
```
这里限制了每个接口内部执行的时间暂时设置到了30分钟不包括排队时间超时就是解析失败
#### **后端的限制:**
目前后端发起招标请求如果发送超过100max_connections=100个请求我这边会排队后面的请求这时后端的计时器会将这些请求也视作正在解析中事实上它们还在排队等待中这样会导致在极端情况下新进的解析文件速度大于解析的速度排队越来越长后面的文件会因为等待时间过长而直接失败而不是'解析失败'。
### general
是公共函数存放的文件夹llm下是各类大模型读取文件下是docx pdf文件的读取以及文档清理clean_pdf去页眉页脚页码
![1](https://pic.bitday.top/i/2025/03/24/oteo5n-0.png)
general下的llm下的清除file_id.py 需要**每周运行至少一次**防止file_id数量超出我这边对每次请求结束都有file_id记录并清理向应该还没加
llm下的model_continue_query是'模型继续回答'脚本,应对超长文本模型一次无法输出完的情况,继续提问,拼接成完整的内容。
general下的file2markdown是textin 文件--》markdown
general下的format_change是pdf-》docx 或doc/docx->pdf
general下的merge_pdfs.py是拼接文件的1.拼接招标公告+投标人须知 2.拼接评标细则章节+资格审查章节
**general中比较重要的**
**后处理:**
general下的**post_processing**,解析后的后处理部分包括extract_info、 资格审查、技术偏离 商务偏离 所需提交的证明材料,都在这块生成。
post_processing中的**inner_post_processing**专门提取*extracted_info*
post_processing中的**process_functions_in_parallel**提取
资格审查、技术偏离、 商务偏离、 所需提交的证明材料
![1](https://pic.bitday.top/i/2025/03/24/inx4nv-2.png)
大解析upload用了post_processing完整版
little_zbparse.py、小解析main.py用了inner_post_processing
get_deviation.py、偏离表数据解析main.py用了process_functions_in_parallel
**截取pdf**
*截取pdf_main.py*是顶级函数,
二级是*截取pdf货物标版*.py和*截取pdf工程标版.py* 非general下
三级是*截取pdf通用函数.py*
如何判断截取位置是否正确根据output文件夹中的切分情况打开各个文件查看是否切分准确目前的逻辑主要是按大章切分即'招标公告'章节)
**如果切分不准确,如何定位正则表达式?**
首先判断当前是工程标解析还是货物标解析即zb_type=1还是2
如果是2那么是货物标解析那么就是*截取pdf_main.py*调用*截取pdf货物标版*.py如下图selection=1代表截取'招标公告'那么如果招标公告没有切准就在这块修改。这里可以发现get_notice是通用函数即*截取pdf通用函数.py*中的get_notice函数那么继续往内部跳转。
若开头没截准就改begin_pattern末尾没截准就改end_pattern
![1](https://pic.bitday.top/i/2025/03/24/otey2d-0.png)
![1](https://pic.bitday.top/i/2025/03/24/otexfp-0.png)
另外:在*截取pdf货物标版*.py中还有extract_pages_twice函数即第一次没有切分到之后会运行该函数这边又有一套begin_pattern和end_pattern即二次提取
**如何测试?**
![1](https://pic.bitday.top/i/2025/03/24/otewli-0.png)
输入pdf_path和你要切分的序号selection=1代表切公告依次类推可以看切出来的效果如何。
**无效标和废标公共代码**
获取无效标与废标项的主要执行代码。对docx文件进行预处理=》正则=》temp.txt=》大模型筛选
如果提的不全,可能是正则没涵盖到位,也可能是大模型提示词漏选了。
这里如果段落中既被正则匹配又被follow_up_keywords中的任意一个匹配那么不会添加到temp中即不会被大模型筛选它会**直接添加**到最后的返回中!
![1](https://pic.bitday.top/i/2025/03/24/oteybk-0.png)
**投标人须知正文条款提取成json文件**
将截取到的ztbfile_tobidders_notice_part2.pdf 即须知正文转为clause1.json 文件,便于后续提取**开评定标流程**、**投标文件要求**、**重新招标、不再招标和终止招标**
这块的主要逻辑就是匹配形如'一、总则'这样的大章节
然后匹配形如'1.1' '1.1.1'这样的序号由于是按行读取pdf一个序号后面的内容可能有好几行因此遇到下一个序号如'2.1')开头,之前的内容都视为上一个序号的。
### old_version
都是废弃文件代码,未在正式、测试环境中使用的,不用管
![1](https://pic.bitday.top/i/2025/03/24/oteylc-0.png)
### routes
是接口以及主要实现部分,一一对应
![1](https://pic.bitday.top/i/2025/03/24/otevph-0.png)
get_deviation对应偏离表数据解析main获得偏离表数据
judge_zbfile对应判断是否是招标文件
little_zbparse对应小解析main负责解析extract_info
test_zbparse是测试接口无对应
upload对应工程标解析和货物标解析即大解析
**混淆澄清**:小解析可以指代一个过程,即从'投标文件生成'这个入口进去的解析后端会同时调用little_zbparse和get_deviation。这个过程称为'小解析'。
但是little_zbparse也叫小解析命名如此因为最初只需返回这些数据(extract_info),后续才陆续返回商务、技术偏离...
utils是接口这块的公共功能函数。其中validate_and_setup_logger函数对不同的接口请求对应到不同的output文件夹如upload->output1。后续增加接口也可直接在这里写映射关系。
![1](https://pic.bitday.top/i/2025/03/24/otez1u-0.png)
重点关注大解析:**upload.py**和**货物标解析main.py**
### static
存放解析的输出和提示词
其中output用gitignore了git push不会推送这块内容。
各个文件夹(output1 output2..)对应不同的接口请求
![1](https://pic.bitday.top/i/2025/03/24/otf10z-0.png)
### test_case&testdir
test_case是测试用例是对一些函数的测试。好久没更新了
testdir是平时写代码的测试的地方
它们都不影响正式和测试环境的解析
![1](https://pic.bitday.top/i/2025/03/24/oteyur-0.png)
### 工程标&货物标
是两个解析流程中不一样的地方(一样的都写在**general**中了)
![1](https://pic.bitday.top/i/2025/03/24/otewqu-0.png)
主要是货物标额外解析了采购要求提取采购需求main+技术参数要求提取+商务服务其他要求提取)
### 最后:
ConnectionLimiter.py定义了接口超时时间->超时后断开与后端的连接
![1](https://pic.bitday.top/i/2025/03/24/otezwk-0.png)
logger_setup.py 为每个请求创建单独的log每个log对应一个log.txt
start_up.py是启动脚本run_serve也是启动脚本是对start_up.py的简单封装目前dockerfile定义的直接使用run_serve启动
## 持续关注
```text
yield sse_format(tech_deviation_response)
yield sse_format(tech_deviation_star_response)
yield sse_format(zigefuhe_deviation_response)
yield sse_format(shangwu_deviation_response)
yield sse_format(shangwu_star_deviation_response)
yield sse_format(proof_materials_response)
```
1. 工程标解析目前仍没有解析采购要求这一块,因此后处理返回的只有'资格审查'和''证明材料"和"extracted_info",没有''商务偏离''及'商务带星偏离',也没有'技术偏离'和'技术带星偏离',而货物标解析是完全版。
其中''证明材料"和"extracted_info"是直接返给后端保存的
2. 大解析中返回了技术评分,后端接收后不仅显示给前端,还会返给向,用于生成技术偏离表
3. 小解析时get_deviation.py其实也可以返回技术评分但是没有返回因为没人和我对接暂时注释了。
![1](https://pic.bitday.top/i/2025/03/24/otf0av-0.png)
4.商务评议和技术评议偏离表,即评分细则的偏离表,暂时没做,但是**商务评分、技术评分**无论大解析还是小解析都解析了,稍微对该数据处理一下返回给后端就行。
![1](https://pic.bitday.top/i/2025/03/24/otf0bp-0.png)
这个是解析得来的结果,适合给前端展示,但是要生成商务技术评议偏离表的话,需要再调一次大模型,对该数据进行重新归纳,以字符串列表为佳。再传给后端。(未做)
### 如何定位问题
1. 查看static下的output文件夹 upload大解析对应output1
2. docker-compose文件中规定了数据卷挂载的路径- /home/Z/zbparse_output_dev:/flask_project/flask_app/static/output
也就是说static/output映射到了服务器的Z/zbparse_output_dev文件夹
3. 根据时间查找哪个子文件夹uuid作为子文件名
4. 查看是否有final_result.json文件如果有说明解析流程正常结束了问题可能出在后端a.后端接口请求超限30分钟 b.后处理存在解析数据的时候出错)
也可能出现在自身解析可以查看子文件内的log.txt查看日志。
5. 若解析正常有final_result但解析不准可以根据以下定位
a.查看子文件夹下的文件切分是否准确例如如果评标办法不准确那么查看ztbfile_evaluation_methon是否正确切到了评分细则。如果切到了那就改general/商务技术评分提取里的提示词否则修改截取pdf那块关于'评标办法'的正则表达式。
b.总之是**先看切的准不准,再看提示词能否优化**,都要定位到对应的代码中!
## 学习总结
### Flask + Waitress
Flask 和 Waitress 是两个不同层级的工具,在 Python Web 开发中扮演互补角色。它们的协作关系可以概括为:**Flask 负责构建 Web 应用逻辑,而 Waitress 作为生产级服务器承载 Flask 应用**。
```text
# Flask 开发服务器(仅用于开发)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
# 使用 Waitress 启动(生产环境)
from waitress import serve
serve(app, host='0.0.0.0', port=8080)
```
**Waitress 的工作方式**
- 作为 WSGI 服务器Waitress 作为一个 WSGI 服务器,负责监听指定端口上的网络请求,并将请求传递给 WSGI 应用(如 Flask 应用)。
- 多线程处理默认情况下waitress 在**单个进程**内启用线程池。当请求到达时waitress 会从线程池中分配一个线程来处理这个请求。由于 GIL 限制,同一时间只有一个线程在执行 Python 代码只能使用一个核心CPU利用率只能到100%)。
**Flask 与 waitress 的协同工作**
- **WSGI 接口**Flask 应用实现了 WSGI 接口。waitress 接收到请求后,会调用 Flask 应用对应的视图函数来处理请求,生成响应。
- **请求处理流程**
- 请求进入 waitress
- waitress 分配一个线程并调用 Flask 应用
- Flask 根据路由匹配并执行对应的处理函数
- 处理函数返回响应waitress 将响应发送给客户端
**Waitress 的典型使用场景**
1. **跨平台部署**:尤其适合 Windows 环境Gunicorn 等服务器不支持)。
2. **简单配置**:无需复杂设置即可获得比开发服务器(Flask自带)更强的性能。
3. **中小型应用**对并发要求不极高的场景Waitress 的轻量级特性优势明显。
**Waitress的不足与处理**
由于 waitress 是在单进程下工作,所有线程共享进程内存,如果业务逻辑简单且无复杂资源共享问题,这种方式是足够的。
**引入子进程**:如果需要每个请求实现内存隔离或者绕过 GIL 来利用多核 CPU有时会在 Flask 视图函数内部启动子进程来处理实际任务。
**直接采用多进程部署方案**:使用 Gunicorn 的多 worker 模式
### Gunicorn
**Gunicorn 的工作方式**
- **预启动 Worker 进程**。Gunicorn 启动时,会按照配置数量(例如 4 个 worker创建多个 worker 进程。这些 worker 进程会一直运行,并监听同一个端口上的请求。不会针对每个请求单独创建新进程。
- **共享 socket**:所有 worker 进程共享同一个监听 socket当有请求到来时操作系统会将请求分发给某个空闲的 worker。
推荐worker 数量 = (2 * CPU 核心数) + 1
**如何启动:**
要使用异步 worker你需要
```text
pip install gevent
```
启动 Gunicorn 时指定 worker 类型和数量,例如:
```text
gunicorn -k gevent -w 4 --max-requests 100 flask_app.start_up:create_app --bind 0.0.0.0:5000
```
使用 `-k gevent`(或者 `-k eventlet`)就可以使用异步 worker单个 worker 能够处理多个 I/O 密集型请求。
使用--max-requests 100 。每个 worker 在处理完 100 个请求后会自动重启,从而释放可能累积的内存。
### 本项目的执行流程:
1. 调用CPU进行PDF文件的读取与切分CPU密集型耗时半分钟
2. 针对切分之后的不同部分分别调用大模型得到回答IO密集型耗时2分钟。
解决方案:
1.使用flask+waitresswaitress会为每个用户请求开新的线程处理然后我的代码逻辑会在这个线程内**开子进程**来执行具体的代码以绕过GIL限制且正确释放内存资源。
**后续可以开一个共享的进程池代替为每个请求开子进程。以避免高并发下竞争多核导致的频繁CPU切换问题。
2.使用Gunicorn的异步workergunicorn为固定创建worker进程处理用户请求一个异步 worker 可以同时处理多个用户请求因为当一个请求在等待外部响应例如调用大模型接口worker 可以切换去处理其他请求。
### 全局解释器锁GIL
Python特别是 CPython 实现中有一个叫做全局解释器锁Global Interpreter Lock简称 GIL的机制这个锁确保在任何时刻**只有一个线程**在执行 **Python 字节码。**
这意味着,即使你启动了多个线程,它们在执行 Python 代码时实际上是串行执行的,而不是并行利用多核 CPU。
在 Java 中,多线程通常能充分利用多核,因为 **Java 的线程是真正的系统级线程**,不存在类似 CPython 中的 GIL 限制。
**影响**
- **CPU密集型任务**:由于 GIL 的存在,在 CPU 密集型任务中,多线程往往不能提高性能,因为同时只有一个线程在执行 Python 代码。
- **I/O密集型任务**:如果任务主要等待 I/O例如**网络**、**磁盘读写**),线程在等待时会释放 GIL此时多线程可以提高程序的响应性和吞吐量。
**NumPy**能够在一定程度上绕过 Python 的 GIL 限制。许多 NumPy 的数值计算操作(如矩阵乘法、向量化运算等)是由高度优化的 C 或 Fortran 库(如 BLAS、LAPACK实现的。这些库通常在执行计算密集型任务时会释放 GIL。**C 扩展模块的方式将 C 代码嵌入到 Python 中,从而利用底层 C 库的高性能优势**
### 进程与线程
1、进程是操作系统分配任务的基本单位进程是python中正在运行的程序当我们打开了1个浏览器时就是开始了一个浏览器进程
线程是进程中执行任务的基本单元(执行指令集),一个进程中至少有一个线程、当只有一个线程时,称为**主线程**
2、线程的创建和销毁耗费资源少进程的创建和销毁耗费资源多线程很容易创建进程不容易创建
3、线程的切换速度快进程慢
4、一个进程中有多个线程时线程之间可以进行通信一个进程中有多个子进程时进程与进程之间不可以相互通信如果需要通信时就必须通过一个中间代理实现Queue、Pipe。
5、多进程可以利用多核cpu**多线程不可以利用多核cpu**
6、一个新的线程很容易被创建一个新的进程创建需要对父进程进行一次克隆
7、多进程的主要目的是充分使用CPU的多核机制**多线程的主要目的是充分利用某一个单核**
———————————————
**每个进程有自己的独立 GIL**
**多线程适用于 I/O 密集型任务**
**多进程适用于CPU密集型任务**
**因此,多进程用于充分利用多核,进程内开多线程以充分利用单核。**
#### 进程池
**multiprocessing.Pool库**,通过 `maxtasksperchild` 指定每个子进程在退出前最多执行的任务数,这有助于防止某些任务中可能存在的内存泄漏问题
```text
pool =Pool(processes=10, maxtasksperchild=3)
```
**concurrent.futures.ProcessPoolExecutor**更高级、更统一,没有类似 `maxtasksperchild` 的参数,意味着进程在整个执行期内会一直存活,适合任务本身**比较稳定**的场景。
pool =ProcessPoolExecutor(max_workers=10)
最好创建的进程数**等同于**CPU核心数如果大于且每个进程都是CPU密集型高负债一直用到CPU那么进程之间会竞争CPU导致上下文切换增加反而会降低性质。
设置的工作进程数接近 CPU 核心数,以便每个进程能**独占一个核**运行。
#### 进程、线程间通信
**线程间通信**
- 线程之间可以直接共享全局变量、对象或数据结构,不需要额外的序列化过程,但这也带来了同步的复杂性(如竞态条件)。
```text
import threading
num=0
def work():
global num
for i in range(1000000):
num+=1
print('work',num)
def work1():
global num
for i in range(1000000):
num+=1
print('work1',num)
if __name__ == '__main__':
t1=threading.Thread(target=work)
t2=threading.Thread(target=work1)
t1.start()
t2.start()
t1.join()
t2.join()
print('主线程执行结果',num)
```
运行结果:
```text
work 1551626
work1 1615783
主线程执行结果 1615783
```
这些数值都小于预期的 2000000因为
即使存在 GIL`num += 1` 这样的操作实际上**并不是原子**的。GIL 确保同一时刻只有一个线程执行 Python 字节码,但在执行 `num += 1` 时,实际上会发生下面几步操作:
1. 从内存中读取 `num` 的当前值
2. 对读取到的值进行加 1 操作
3. 将新的值写回到内存
**由多个字节码组成!!!**
因此会导致:
线程 A 读取到 `num` 的值
切换到线程 B线程 B 也读取同样的 `num` 值并进行加 1然后写回
当线程 A 恢复时,它依然基于之前读取的旧值进行加 1最后写回从而覆盖了线程 B 的更新
**解决:**
```text
from threading import Lock
import threading
num=0
def work():
global num
for i in range(1000000):
with lock:
num+=1
print('work',num)
def work1():
global num
for i in range(1000000):
with lock:
num+=1
print('work1',num)
if __name__ == '__main__':
lock=Lock()
t1=threading.Thread(target=work)
t2=threading.Thread(target=work1)
t1.start()
t2.start()
t1.join()
t2.join()
print('主线程执行结果',num)
```
**进程间通信IPC**
- 进程之间默认不共享内存,因此如果需要传递数据,就必须使用专门的通信机制。
- 在 Python 中,可以使用 `multiprocessing.Queue``multiprocessing.Pipe`、共享内存(如 `multiprocessing.Value``multiprocessing.Array`)等方式实现进程间通信。
```text
from multiprocessing import Process, Queue
def worker(process_id, q):
# 每个进程将数据放入队列
q.put(f"data_from_process_{process_id}")
print(f"Process {process_id} finished.")
if __name__ == '__main__':
q = Queue()
processes = []
for i in range(5):
p = Process(target=worker, args=(i, q))
processes.append(p)
p.start()
for p in processes:
p.join()
# 从队列中收集数据
results = []
while not q.empty():
results.append(q.get())
print("Collected data:", results)
```
- 当你在主进程中创建了一个 `Queue` 对象,然后将它作为参数传递给子进程时,子进程会获得一个能够与主进程通信的“句柄”。
- 子进程中的 `q.put(...)` 操作会将数据通过这个管道传送到主进程,而主进程可以通过 `q.get()` 来获取这些数据。
- 这种机制虽然看起来像是“共享”,但实际上是通过 IPC进程间通信实现的而不是直接共享内存中的变量。
### 项目贡献
![1](https://pic.bitday.top/i/2025/03/24/oteynu-0.png)
![1](https://pic.bitday.top/i/2025/03/24/otnt8r-0.png)
![1](https://pic.bitday.top/i/2025/03/24/otewsz-0.png)
![1](https://pic.bitday.top/i/2025/03/24/otnpl5-0.png)
### 效果图
![1](https://pic.bitday.top/i/2025/03/24/otnugp-0.png)
![](https://pic.bitday.top/i/2025/03/24/otnpak-0.gif)
![1](https://pic.bitday.top/i/2025/03/24/otno48-0.png)
![1](https://pic.bitday.top/i/2025/03/24/otnrj7-0.png)
![1](https://pic.bitday.top/i/2025/03/24/otohdw-0.png)