This commit is contained in:
zhangsan 2025-03-18 18:04:27 +08:00
parent 390b9ccc92
commit f7f099a50e
11 changed files with 157 additions and 177 deletions

31
.gitignore vendored
View File

@ -1 +1,30 @@
config.py
# 不提交环境变量文件
.env
# 忽略 Python 编译文件和缓存
*.pyc
__pycache__/
# 如果使用虚拟环境,也可以忽略这些目录(根据实际情况选择)
venv/
ENV/
env/
# IntelliJ IDEA 项目配置(包括 PyCharm 等基于 IDEA 的 IDE
.idea/
*.iml
*.ipr
*.iws
# Windows 系统生成的文件
Thumbs.db
# macOS 系统生成的文件
.DS_Store
# 日志文件
*.log
logs/
# 可选:忽略 Docker 相关的临时或覆盖配置文件(如果不需要提交)
docker-compose.override.yml

View File

@ -1,6 +1,3 @@
cos_python_sdk_v5==1.9.15
panflute==2.1.3
pypandoc==1.8
pytypecho==2.1.0
qcloud_cos==3.3.6
pymysql==1.0.2
python-dotenv==0.21.0

View File

@ -0,0 +1,33 @@
import uuid
import requests
from urllib.parse import urlparse
import os
import shutil
def download_image(url, output_path):
"""
从网络下载图片并保存到指定路径
"""
try:
response = requests.get(url, stream=True)
if response.status_code == 200:
# 获取图片扩展名
parsed_url = urlparse(url)
ext = os.path.splitext(parsed_url.path)[1]
if not ext:
ext = '.png' # 默认使用 .png 扩展名
# 生成新的文件名
new_filename = f"{uuid.uuid4()}{ext}"
dest_path = os.path.join(output_path, new_filename)
# 保存图片
with open(dest_path, 'wb') as f:
response.raw.decode_content = True
shutil.copyfileobj(response.raw, f)
print(f"已下载: {url}{dest_path}")
return new_filename
else:
print(f"警告: 无法下载图片 {url},状态码: {response.status_code}")
except Exception as e:
print(f"错误: 下载图片 {url} 时出错: {e}")
return None

View File

@ -2,40 +2,9 @@ import os
import re
import shutil
import uuid
import requests
from urllib.parse import urlparse
from upload_img import upload_image
def download_image(url, output_path):
"""
从网络下载图片并保存到指定路径
"""
try:
response = requests.get(url, stream=True)
if response.status_code == 200:
# 获取图片扩展名
parsed_url = urlparse(url)
ext = os.path.splitext(parsed_url.path)[1]
if not ext:
ext = '.png' # 默认使用 .png 扩展名
# 生成新的文件名
new_filename = f"{uuid.uuid4()}{ext}"
dest_path = os.path.join(output_path, new_filename)
# 保存图片
with open(dest_path, 'wb') as f:
response.raw.decode_content = True
shutil.copyfileobj(response.raw, f)
print(f"已下载: {url}{dest_path}")
return new_filename
else:
print(f"警告: 无法下载图片 {url},状态码: {response.status_code}")
except Exception as e:
print(f"错误: 下载图片 {url} 时出错: {e}")
return None
def extract_image_paths(content):
"""
Markdown 内容中提取所有图片路径支持 Markdown HTML 格式
@ -209,29 +178,53 @@ def process_md_file_remote(md_file):
print(f"已更新: {md_file}")
def process_md_files(input_path,output_path,type):
def scan_files(base_folder, exclude_folders):
"""
扫描 base_folder 目录下所有 Markdown 文件
并排除路径中包含 exclude_folders 中任一字符串的目录
"""
md_files = []
for root, dirs, files in os.walk(base_folder):
# 如果当前目录中包含需要排除的文件夹,则跳过该目录
if any(exclude in root for exclude in exclude_folders):
continue
for file in files:
if file.lower().endswith('.md'):
md_files.append(os.path.join(root, file))
return md_files
def process_md_files(input_path, output_path, type, exclude_folders=None):
"""
处理输入目录下所有 Markdown 文件并将处理后的图片保存到 output_path
type 参数决定了使用哪种处理方式
type == 1: process_md_file_local
type == 2: process_md_file_with_assets
type == 3: process_md_file_remote
"""
# 创建输出目录(如果不存在)
os.makedirs(output_path, exist_ok=True)
# 获取 Markdown 文件列表
if exclude_folders is None:
exclude_folders = []
md_files = scan_files(input_path, exclude_folders)
# 遍历处理所有 Markdown 文件
for root, _, files in os.walk(input_path):
for file in files:
if file.lower().endswith('.md'):
md_file = os.path.join(root, file)
if type==1:
process_md_file_local(md_file, output_path)
elif type==2:
process_md_file_with_assets(md_file,output_path)
elif type==3:
process_md_file_remote(md_file)
else:
pass
for md_file in md_files:
if type == 1:
process_md_file_local(md_file, output_path) #url改为本地图片存output_path
elif type == 2:
process_md_file_with_assets(md_file, output_path) #url改为本地图片和md都存output_path
elif type == 3:
process_md_file_remote(md_file) #url改公网链接
else:
print(f"未知的处理类型: {type}")
print("处理完成!所有图片已保存至:", os.path.abspath(output_path))
if __name__ == "__main__":
type=1
input_path = r'D:\folder\test\tt'
output_path = r'D:\folder\test\output2'
input_path = r'D:\folder\study\md_files\Java\zbparse'
output_path = r'D:\folder\test\output'
process_md_files(input_path,output_path,type)

View File

@ -1,12 +1,6 @@
base_folder = 'D:/Notes/'
exclude_folders = ['工作笔记']
# cos config
secret_id = 'xxx' # 替换为用户的 SecretId请登录访问管理控制台进行查看和管理https://console.cloud.tencent.com/cam/capi
secret_key = 'xxx' # 替换为用户的 SecretKey请登录访问管理控制台进行查看和管理https://console.cloud.tencent.com/cam/capi
region = 'ap-shanghai'
bucket = 'xxx'
# typecho config
website_xmlrpc_url = '' # https://www.abc.com/index.php/action/xmlrpc
website_username = 'xxx'

View File

@ -1,17 +0,0 @@
import os.path
from qcloud_cos import CosConfig, CosS3Client
class CosPicUploader:
def __init__(self, secret_id, secret_key, region, bucket):
self.__bucket = bucket
self.__config = CosConfig(Region=region, Secret_id=secret_id, Secret_key=secret_key)
self.__client = CosS3Client(self.__config)
def upload_file(self, key, file_path):
file_path = file_path.replace('\\', '/')
with open(file_path, 'rb') as f:
self.__client.put_object(Bucket=self.__bucket, Body=f, Key=key)
res = self.__client.get_object_url(Bucket=self.__bucket, Key=key)
return res

View File

@ -1,73 +1,79 @@
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import logging
import os.path
import os
from dotenv import load_dotenv
from markdown_file_searcher import scan_files
from markdown_img_searcher import scan_imgs
from cos_pic_uploader import CosPicUploader
import config
# 加载 .env 文件中的环境变量
load_dotenv()
from transfer_md.transfer import process_md_file_remote, scan_files # 假设该模块中实现了相应函数
from typecho_xmlrpc_publisher import TypechoXmlRpcPublisher
from typecho_direct_mysql_publisher import TypechoDirectMysqlPublisher
uploader = CosPicUploader(
config.secret_id,
secret_key=config.secret_key,
region=config.region,
bucket=config.bucket
)
# 初始化发布器,直接使用 os.getenv 获取环境变量
typecho_publisher = TypechoXmlRpcPublisher(
config.website_xmlrpc_url,
config.website_username,
config.website_password
os.getenv('WEBSITE_XMLRPC_URL'),
os.getenv('WEBSITE_USERNAME'),
os.getenv('WEBSITE_PASSWORD')
)
mysql_publisher = TypechoDirectMysqlPublisher(
config.mysql_host,
config.mysql_port,
config.mysql_username,
config.mysql_password,
config.mysql_typecho_database,
config.mysql_typecho_table_prefix
os.getenv('MYSQL_HOST'),
int(os.getenv('MYSQL_PORT', 3306)),
os.getenv('MYSQL_USERNAME'),
os.getenv('MYSQL_PASSWORD'),
os.getenv('MYSQL_TYPECHO_DATABASE'),
os.getenv('MYSQL_TYPECHO_TABLE_PREFIX')
)
def execute_flow_with_typecho_xmlrpc(file_path):
"""
使用 XML-RPC 接口发布文章
这里 process_md_file_remote 用于处理 Markdown 文件上传本地图片并替换 URL
"""
# 先对 Markdown 文件进行处理:上传本地图片并替换为公网地址
process_md_file_remote(file_path)
with open(file_path, 'r', encoding='utf-8') as file:
file_base_path = os.path.dirname(file_path)
file_base_name = os.path.splitext(os.path.basename(file_path))[0] #无后缀文件名
file_base_name = os.path.splitext(os.path.basename(file_path))[0]
md_source_text = file.read()
md_img_urls = scan_imgs(file_path)
if len(md_img_urls) > 0:
for md_img_url in md_img_urls:
img_file = os.path.join(file_base_path, md_img_url)
img_file_name = os.path.basename(img_file)
oss_url = uploader.upload_file(key=file_base_name+'-'+img_file_name, file_path=img_file)
md_source_text = md_source_text.replace('](' + md_img_url + ')', '](' + oss_url + ')')
post_id = typecho_publisher.publish_post(file_base_name, md_source_text)
print('发布成功 --> ' + file_base_name + ' - ' + str(post_id))
# category_name = os.path.basename(os.path.dirname(file_path))
# 注意XML-RPC 方式不需要 category_name 参数
post_id = typecho_publisher.publish_post(file_base_name, md_source_text)
print('发布成功 --> ' + file_base_name + ' - ' + str(post_id))
def execute_flow_with_typecho_mysql(file_path):
"""
使用 MySQL 直连方式发布文章
这里 process_md_file_remote 用于处理 Markdown 文件上传本地图片并替换为公网地址
分类名称将从文件路径的上一级目录中获取
"""
# 先对 Markdown 文件进行处理:上传本地图片并替换为公网地址
process_md_file_remote(file_path)
with open(file_path, 'r', encoding='utf-8') as file:
file_base_path = os.path.dirname(file_path)
file_base_name = os.path.splitext(os.path.basename(file_path))[0] #无后缀文件名
category_name = os.path.basename(file_base_path)
file_base_name = os.path.splitext(os.path.basename(file_path))[0] #os.path.basename(path)返回给定路径中的最后一个组件
md_source_text = file.read()
md_img_urls = scan_imgs(file_path)
if len(md_img_urls) > 0:
for md_img_url in md_img_urls:
img_file = os.path.join(file_base_path, md_img_url)
img_file_name = os.path.basename(img_file)
oss_url = uploader.upload_file(key=file_base_name+'-'+img_file_name, file_path=img_file)
md_source_text = md_source_text.replace('](' + md_img_url + ')', '](' + oss_url + ')')
post_id = mysql_publisher.publish_post(file_base_name, md_source_text, category_name)
print('发布成功 --> ' + file_base_name + ' - ' + str(post_id))
# 从文件的上一级目录获取分类名称
category_name = os.path.basename(os.path.dirname(file_path)) #os.path.dirname(path)返回给定路径中目录部分(去掉最后一个)
post_id = mysql_publisher.publish_post(file_base_name, md_source_text, category_name)
print('发布成功 --> ' + file_base_name + ' - ' + str(post_id))
if __name__ == '__main__':
logging.basicConfig(level='ERROR')
files = scan_files(config.base_folder, config.exclude_folders)
# 获取 base_folder 和 exclude_folders 配置
base_folder = os.getenv('BASE_FOLDER')
exclude_folders = os.getenv('EXCLUDE_FOLDERS', '').split(',')
files = scan_files(base_folder, exclude_folders)
for md_file in files:
# 根据需要选择使用哪种发布方式:
# execute_flow_with_typecho_xmlrpc(md_file)
execute_flow_with_typecho_mysql(md_file)

View File

@ -1,27 +0,0 @@
import os
from os import path
# md文件扫描
def __scaner_files(results, file_path, exclude_folders=[]):
file = os.listdir(file_path)
for f in file:
real_path = path.join(file_path, f)
if path.isfile(real_path):
if path.basename(real_path).endswith('.md'):
results.append(path.abspath(real_path))
# 如果是文件,则保存绝对路径
elif path.isdir(real_path):
# 如果是目录,则是递归
if path.basename(real_path) in exclude_folders:
continue
else:
__scaner_files(results, real_path, exclude_folders)
else:
print("error")
def scan_files(file_path, exclude_folders):
results = []
__scaner_files(results, file_path, exclude_folders)
return results

View File

@ -1,30 +0,0 @@
import io
import os.path
import panflute
import pypandoc
# 读取md图片地址
def __prepare(doc):
doc.images = []
doc.links = []
def __action(elem, doc):
if isinstance(elem, panflute.Image):
doc.images.append(elem)
elif isinstance(elem, panflute.Link):
doc.links.append(elem)
def scan_imgs(file_path):
data = pypandoc.convert_file(file_path, 'json')
doc = panflute.load(io.StringIO(data))
doc.images = []
doc.links = []
doc = panflute.run_filter(__action, prepare=__prepare, doc=doc)
results = []
for image in doc.images:
results.append(image.url)
return results

View File

@ -1,3 +1,4 @@
#typecho_direct_mysql_publisher.py
import pymysql
import time

View File

@ -1,3 +1,4 @@
#typecho_xmlrpc_publisher.py
# typecho api调用
from pytypecho import Post, Typecho