first commit

This commit is contained in:
zhangsan 2025-03-18 15:53:40 +08:00
commit 390b9ccc92
18 changed files with 608 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
config.py

8
.idea/.gitignore generated vendored Normal file
View File

@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

View File

@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

4
.idea/misc.xml generated Normal file
View File

@ -0,0 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.8 (reptile) (2)" project-jdk-type="Python SDK" />
</project>

8
.idea/modules.xml generated Normal file
View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/transfer_md.iml" filepath="$PROJECT_DIR$/.idea/transfer_md.iml" />
</modules>
</component>
</project>

8
.idea/transfer_md.iml generated Normal file
View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

6
.idea/vcs.xml generated Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

8
README.md Normal file
View File

@ -0,0 +1,8 @@
将本地文件夹下的markdown文件发布到typecho的站点中
### TODO
- [x] 将markdown发布到typecho
- [x] 发布前将markdown的图片资源上传到TencentCloud的COS中, 并替换markdown中的图片链接
- [x] 将md所在的文件夹名称作为post的category(mysql发布可以插入category, xmlrpc接口暂时不支持category操作)
- [ ] category的层级
- [ ] 发布前先获取所有post信息, 不发布已经发布过的post

6
requirements.txt Normal file
View File

@ -0,0 +1,6 @@
cos_python_sdk_v5==1.9.15
panflute==2.1.3
pypandoc==1.8
pytypecho==2.1.0
qcloud_cos==3.3.6
pymysql==1.0.2

237
transfer_md/transfer.py Normal file
View File

@ -0,0 +1,237 @@
import os
import re
import shutil
import uuid
import requests
from urllib.parse import urlparse
from upload_img import upload_image
def download_image(url, output_path):
"""
从网络下载图片并保存到指定路径
"""
try:
response = requests.get(url, stream=True)
if response.status_code == 200:
# 获取图片扩展名
parsed_url = urlparse(url)
ext = os.path.splitext(parsed_url.path)[1]
if not ext:
ext = '.png' # 默认使用 .png 扩展名
# 生成新的文件名
new_filename = f"{uuid.uuid4()}{ext}"
dest_path = os.path.join(output_path, new_filename)
# 保存图片
with open(dest_path, 'wb') as f:
response.raw.decode_content = True
shutil.copyfileobj(response.raw, f)
print(f"已下载: {url}{dest_path}")
return new_filename
else:
print(f"警告: 无法下载图片 {url},状态码: {response.status_code}")
except Exception as e:
print(f"错误: 下载图片 {url} 时出错: {e}")
return None
def extract_image_paths(content):
"""
Markdown 内容中提取所有图片路径支持 Markdown HTML 格式
"""
pattern_md = re.compile(r'!\[.*?\]\((.*?)\)')
pattern_html = re.compile(r'<img\s+[^>]*src\s*=\s*"(.*?)"')
return set(pattern_md.findall(content) + pattern_html.findall(content))
def process_local_image_copy(abs_img_path, dest_folder):
"""
复制本地图片到目标文件夹并返回新文件名使用 UUID 命名保留扩展名
"""
ext = os.path.splitext(abs_img_path)[1]
new_filename = f"{uuid.uuid4()}{ext}"
dest_path = os.path.join(dest_folder, new_filename)
shutil.copy2(abs_img_path, dest_path)
return new_filename
def process_md_file_local(md_file, output_path):
"""
处理一个 Markdown 文件
- 提取 Markdown HTML 格式的图片路径
- 复制本地图片到 output_path并修改 md 文件中的图片引用路径
- 下载网络图片到 output_path并修改 md 文件中的图片引用路径
- 图片复制时使用 UUID 作为文件名保留扩展名
- 更新后的图片路径为绝对路径
"""
with open(md_file, 'r', encoding='utf-8') as f:
content = f.read()
# 使用抽离的函数提取图片路径
img_paths = extract_image_paths(content)
# 获取当前 md 文件所在目录
md_dir = os.path.dirname(md_file)
for img_path in img_paths:
# 判断图片路径是本地路径还是网络 URL
if img_path.startswith(('http://', 'https://')):
# 处理网络图片
new_filename = download_image(img_path, output_path)
if new_filename:
# 使用绝对路径替换
new_ref = os.path.join(output_path, new_filename).replace('\\', '/')
content = content.replace(img_path, new_ref)
else:
# 处理本地图片
if os.path.isabs(img_path):
abs_img_path = img_path
else:
abs_img_path = os.path.normpath(os.path.join(md_dir, img_path))
if os.path.exists(abs_img_path):
if os.path.isfile(abs_img_path): # 确保是文件而不是文件夹
# 使用抽离的复制函数处理图片
new_filename = process_local_image_copy(abs_img_path, output_path)
dest_path = os.path.join(output_path, new_filename)
print(f"已复制: {abs_img_path}{dest_path}")
# 使用绝对路径替换
new_ref = dest_path.replace('\\', '/')
content = content.replace(img_path, new_ref)
else:
print(f"警告: 跳过文件夹 {abs_img_path}")
else:
print(f"警告: 图片文件不存在 {abs_img_path}")
# 写回修改后的内容
with open(md_file, 'w', encoding='utf-8') as f:
f.write(content)
print(f"已更新: {md_file}")
def process_md_file_with_assets(md_file, output_base_path):
"""
处理单个 Markdown 文件将其拷贝到 output_base_path/<md_name>/
并在该文件夹中建立 assets 文件夹保存相关图片
同时更新 md 文件中图片的引用路径为相对路径 assets/<new_filename>
"""
# 创建对应的输出文件夹及 assets 子文件夹
md_filename = os.path.basename(md_file)
md_name, _ = os.path.splitext(md_filename)
target_folder = os.path.join(output_base_path, md_name)
assets_folder = os.path.join(target_folder, "assets")
os.makedirs(assets_folder, exist_ok=True)
# 读取 Markdown 文件内容
with open(md_file, 'r', encoding='utf-8') as f:
content = f.read()
# 使用抽离的函数提取图片路径
img_paths = extract_image_paths(content)
# 获取 md 文件所在目录(用于处理相对路径的本地图片)
md_dir = os.path.dirname(md_file)
# 遍历所有图片路径
for img_path in img_paths:
new_filename = None
if img_path.startswith(('http://', 'https://')):
# 处理网络图片:下载图片到 assets_folder
try:
# 处理网络图片:下载图片到 assets_folder
new_filename = download_image(img_path, assets_folder)
except Exception as e:
print(f"错误: 下载图片 {img_path} 时出错: {e}")
else:
# 处理本地图片
if os.path.isabs(img_path):
abs_img_path = img_path
else:
abs_img_path = os.path.normpath(os.path.join(md_dir, img_path))
if os.path.exists(abs_img_path) and os.path.isfile(abs_img_path):
try:
# 使用抽离的复制函数处理图片
new_filename = process_local_image_copy(abs_img_path, assets_folder)
print(f"已复制: {abs_img_path}{os.path.join(assets_folder, new_filename)}")
except PermissionError as e:
print(f"错误: 无法复制文件 {abs_img_path},权限被拒绝: {e}")
else:
print(f"警告: 图片文件不存在或不是文件 {abs_img_path}")
# 如果成功处理图片,则替换 md 文件中的引用路径
if new_filename:
new_ref = f"assets/{new_filename}"
content = content.replace(img_path, new_ref)
# 将更新后的 md 内容写入目标文件夹中的 md 文件
target_md_path = os.path.join(target_folder, md_filename)
with open(target_md_path, 'w', encoding='utf-8') as f:
f.write(content)
print(f"已更新: {target_md_path}")
def process_md_file_remote(md_file):
"""
处理一个 Markdown 文件
- 提取 Markdown HTML 格式的图片路径
- 对于本地图片调用 upload_image 上传到 easyimage 图床
并替换 md 文件中的图片引用路径为返回的公网地址
- 对于网络图片保持不变
"""
with open(md_file, 'r', encoding='utf-8') as f:
content = f.read()
# 使用抽离的函数提取图片路径
img_paths = extract_image_paths(content)
# 获取当前 md 文件所在目录
md_dir = os.path.dirname(md_file)
for img_path in img_paths:
# 判断是否为本地图片(非网络 URL
if not img_path.startswith(('http://', 'https://')):
if os.path.isabs(img_path):
abs_img_path = img_path
else:
abs_img_path = os.path.normpath(os.path.join(md_dir, img_path))
if os.path.exists(abs_img_path) and os.path.isfile(abs_img_path):
try:
public_url = upload_image(abs_img_path)
print(f"图片已上传: {abs_img_path}{public_url}")
content = content.replace(img_path, public_url)
except Exception as e:
print(f"错误: 图片上传失败 {abs_img_path}: {e}")
else:
print(f"警告: 图片文件不存在 {abs_img_path}")
else:
print(f"跳过网络图片: {img_path}")
with open(md_file, 'w', encoding='utf-8') as f:
f.write(content)
print(f"已更新: {md_file}")
def process_md_files(input_path,output_path,type):
# 创建输出目录(如果不存在)
os.makedirs(output_path, exist_ok=True)
# 遍历处理所有 Markdown 文件
for root, _, files in os.walk(input_path):
for file in files:
if file.lower().endswith('.md'):
md_file = os.path.join(root, file)
if type==1:
process_md_file_local(md_file, output_path)
elif type==2:
process_md_file_with_assets(md_file,output_path)
elif type==3:
process_md_file_remote(md_file)
else:
pass
print("处理完成!所有图片已保存至:", os.path.abspath(output_path))
if __name__ == "__main__":
type=1
input_path = r'D:\folder\test\tt'
output_path = r'D:\folder\test\output2'
process_md_files(input_path,output_path,type)

50
transfer_md/upload_img.py Normal file
View File

@ -0,0 +1,50 @@
import requests
def upload_image(img_path: str) -> str:
"""
上传本地图片到 easyimage 图床并返回图片的公网地址
参数:
img_path: 本地图片路径
返回:
图片在图床上的公网地址
API 参数说明
- API 地址: http://124.71.159.195:1000/api/index.php
- 图片文件对应的 POST 参数名: image
- 自定义 body 参数: {"token": "1a61048560d9a63430816f98ba5a4fb0"}
- 响应 JSON 中的图片地址字段路径: url
"""
url = "https://pic.bitday.top/api/index.php"
token = "3b54c300cba118d185a4f9d2da9af513"
try:
with open(img_path, "rb") as f:
files = {"image": f}
data = {"token": token}
response = requests.post(url, files=files, data=data)
# 检查响应状态码是否为 200 OK
if response.status_code == 200:
result = response.json()
public_url = result.get("url")
if public_url:
return public_url
else:
raise ValueError("响应中未找到图片地址")
else:
raise Exception(f"上传失败,状态码: {response.status_code}, 响应内容: {response.text}")
except Exception as e:
raise Exception(f"上传过程中发生错误: {e}")
# 示例调用
if __name__ == "__main__":
img_path = r"C:\Users\zhangsan\Pictures\社会实践\1.png" # 替换为实际图片路径
try:
public_address = upload_image(img_path)
print("图片上传成功,公网地址:", public_address)
except Exception as err:
print("图片上传失败:", err)

View File

@ -0,0 +1,21 @@
base_folder = 'D:/Notes/'
exclude_folders = ['工作笔记']
# cos config
secret_id = 'xxx' # 替换为用户的 SecretId请登录访问管理控制台进行查看和管理https://console.cloud.tencent.com/cam/capi
secret_key = 'xxx' # 替换为用户的 SecretKey请登录访问管理控制台进行查看和管理https://console.cloud.tencent.com/cam/capi
region = 'ap-shanghai'
bucket = 'xxx'
# typecho config
website_xmlrpc_url = '' # https://www.abc.com/index.php/action/xmlrpc
website_username = 'xxx'
website_password = 'xxx'
# mysql config
mysql_host = 'localhost'
mysql_port = 3306
mysql_username = 'xxx'
mysql_password = 'xxx'
mysql_typecho_database = 'typecho'
mysql_typecho_table_prefix = 'typecho_'

View File

@ -0,0 +1,17 @@
import os.path
from qcloud_cos import CosConfig, CosS3Client
class CosPicUploader:
def __init__(self, secret_id, secret_key, region, bucket):
self.__bucket = bucket
self.__config = CosConfig(Region=region, Secret_id=secret_id, Secret_key=secret_key)
self.__client = CosS3Client(self.__config)
def upload_file(self, key, file_path):
file_path = file_path.replace('\\', '/')
with open(file_path, 'rb') as f:
self.__client.put_object(Bucket=self.__bucket, Body=f, Key=key)
res = self.__client.get_object_url(Bucket=self.__bucket, Key=key)
return res

View File

@ -0,0 +1,73 @@
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import logging
import os.path
from markdown_file_searcher import scan_files
from markdown_img_searcher import scan_imgs
from cos_pic_uploader import CosPicUploader
import config
from typecho_xmlrpc_publisher import TypechoXmlRpcPublisher
from typecho_direct_mysql_publisher import TypechoDirectMysqlPublisher
uploader = CosPicUploader(
config.secret_id,
secret_key=config.secret_key,
region=config.region,
bucket=config.bucket
)
typecho_publisher = TypechoXmlRpcPublisher(
config.website_xmlrpc_url,
config.website_username,
config.website_password
)
mysql_publisher = TypechoDirectMysqlPublisher(
config.mysql_host,
config.mysql_port,
config.mysql_username,
config.mysql_password,
config.mysql_typecho_database,
config.mysql_typecho_table_prefix
)
def execute_flow_with_typecho_xmlrpc(file_path):
with open(file_path, 'r', encoding='utf-8') as file:
file_base_path = os.path.dirname(file_path)
file_base_name = os.path.splitext(os.path.basename(file_path))[0] #无后缀文件名
md_source_text = file.read()
md_img_urls = scan_imgs(file_path)
if len(md_img_urls) > 0:
for md_img_url in md_img_urls:
img_file = os.path.join(file_base_path, md_img_url)
img_file_name = os.path.basename(img_file)
oss_url = uploader.upload_file(key=file_base_name+'-'+img_file_name, file_path=img_file)
md_source_text = md_source_text.replace('](' + md_img_url + ')', '](' + oss_url + ')')
post_id = typecho_publisher.publish_post(file_base_name, md_source_text)
print('发布成功 --> ' + file_base_name + ' - ' + str(post_id))
def execute_flow_with_typecho_mysql(file_path):
with open(file_path, 'r', encoding='utf-8') as file:
file_base_path = os.path.dirname(file_path)
file_base_name = os.path.splitext(os.path.basename(file_path))[0] #无后缀文件名
category_name = os.path.basename(file_base_path)
md_source_text = file.read()
md_img_urls = scan_imgs(file_path)
if len(md_img_urls) > 0:
for md_img_url in md_img_urls:
img_file = os.path.join(file_base_path, md_img_url)
img_file_name = os.path.basename(img_file)
oss_url = uploader.upload_file(key=file_base_name+'-'+img_file_name, file_path=img_file)
md_source_text = md_source_text.replace('](' + md_img_url + ')', '](' + oss_url + ')')
post_id = mysql_publisher.publish_post(file_base_name, md_source_text, category_name)
print('发布成功 --> ' + file_base_name + ' - ' + str(post_id))
if __name__ == '__main__':
logging.basicConfig(level='ERROR')
files = scan_files(config.base_folder, config.exclude_folders)
for md_file in files:
# execute_flow_with_typecho_xmlrpc(md_file)
execute_flow_with_typecho_mysql(md_file)

View File

@ -0,0 +1,27 @@
import os
from os import path
# md文件扫描
def __scaner_files(results, file_path, exclude_folders=[]):
file = os.listdir(file_path)
for f in file:
real_path = path.join(file_path, f)
if path.isfile(real_path):
if path.basename(real_path).endswith('.md'):
results.append(path.abspath(real_path))
# 如果是文件,则保存绝对路径
elif path.isdir(real_path):
# 如果是目录,则是递归
if path.basename(real_path) in exclude_folders:
continue
else:
__scaner_files(results, real_path, exclude_folders)
else:
print("error")
def scan_files(file_path, exclude_folders):
results = []
__scaner_files(results, file_path, exclude_folders)
return results

View File

@ -0,0 +1,30 @@
import io
import os.path
import panflute
import pypandoc
# 读取md图片地址
def __prepare(doc):
doc.images = []
doc.links = []
def __action(elem, doc):
if isinstance(elem, panflute.Image):
doc.images.append(elem)
elif isinstance(elem, panflute.Link):
doc.links.append(elem)
def scan_imgs(file_path):
data = pypandoc.convert_file(file_path, 'json')
doc = panflute.load(io.StringIO(data))
doc.images = []
doc.links = []
doc = panflute.run_filter(__action, prepare=__prepare, doc=doc)
results = []
for image in doc.images:
results.append(image.url)
return results

View File

@ -0,0 +1,87 @@
import pymysql
import time
from pymysql.converters import escape_string
class TypechoDirectMysqlPublisher:
def __init__(self, host, port, user, password, database, table_prefix):
self.__table_prefix = table_prefix
self.__categories_table_name = table_prefix + 'metas'
self.__relationships_table_name = table_prefix + 'relationships'
self.__contents_table_name = table_prefix + 'contents'
self.__db = pymysql.connect(
host=host,
port=port,
user=user,
password=password,
database=database,
charset='utf8mb4'
)
self.__init_categories()
def __init_categories(self):
cursor = self.__db.cursor()
sql = "select mid,name from %s where type='%s'" % (self.__categories_table_name, 'category')
cursor.execute(sql)
results = cursor.fetchall()
self.__exist_categories = []
for item in results:
self.__exist_categories.append({
'mid': item[0],
'name': item[1]
})
def __get_category_id(self, category_name):
if len(self.__exist_categories) > 0:
for item in self.__exist_categories:
if item['name'] == category_name:
return item['mid']
return -1
def __add_category(self, category_name):
cursor = self.__db.cursor()
sql = "INSERT INTO %s " \
"(`name`, `slug`, `type`, `description`, `count`, `order`, `parent`) " \
"VALUES " \
"('%s', '%s', 'category', '', 0, 1, 0)" % (self.__categories_table_name, category_name, category_name)
cursor.execute(sql)
mid = cursor.lastrowid
self.__db.commit()
self.__init_categories()
return mid
def __insert_relationship(self,cursor, cid, mid):
insert_relationship_sql = "INSERT INTO %s" \
"(`cid`, `mid`) " \
"VALUES " \
"(%d, %d)" % (self.__relationships_table_name, cid, mid)
cursor.execute(insert_relationship_sql)
def __update_category_count(self, cursor, mid):
update_category_count_sql = "UPDATE %s SET `count`=`count`+1 WHERE mid=%d" % (self.__categories_table_name, mid)
cursor.execute(update_category_count_sql)
def publish_post(self, title, content, category):
content = '<!--markdown-->' + content
mid = self.__get_category_id(category)
if mid < 0:
mid = self.__add_category(category)
now_time_int = int(time.time())
cursor = self.__db.cursor()
sql = "INSERT INTO %s " \
"(`title`, `slug`, `created`, `modified`, `text`, `order`, `authorId`, `template`, `type`, `status`, `password`, `commentsNum`, `allowComment`, `allowPing`, `allowFeed`, `parent`) " \
"VALUES " \
"('%s', NULL , %d, %d, '%s', 0, 1, NULL, 'post', 'publish', NULL, 0, '1', '1', '1', 0)" \
"" % (self.__contents_table_name, escape_string(title), now_time_int, now_time_int, escape_string(content))
cursor.execute(sql)
cid = cursor.lastrowid
update_slug_sql = "UPDATE %s SET slug=%d WHERE cid=%d" % (self.__contents_table_name, cid, cid)
cursor.execute(update_slug_sql)
self.__insert_relationship(cursor, cid=cid, mid=mid)
self.__update_category_count(cursor, mid)
self.__db.commit()
return cid

View File

@ -0,0 +1,11 @@
# typecho api调用
from pytypecho import Post, Typecho
class TypechoXmlRpcPublisher:
def __init__(self, xmlrpc_url, username, password):
self.__typecho = Typecho(xmlrpc_url, username=username, password=password)
def publish_post(self, title, content):
post = Post(title=title, description=content)
return self.__typecho.new_post(post, publish=True)