diff --git a/notify.py b/notify.py index 4e7efa7..1eca758 100644 --- a/notify.py +++ b/notify.py @@ -9,9 +9,20 @@ import re import threading import time import urllib.parse +import smtplib +from email.mime.text import MIMEText +from email.header import Header +from email.utils import formataddr import requests +import smtplib +from email.mime.text import MIMEText + +msg_from = '785085670@qq.com' # 发送方邮箱地址。 +password = 'rapagfoymtiwbbac' # 发送方QQ邮箱授权码,不是QQ邮箱密码。 +msg_to = '785085670@qq.com' # 收件人邮箱地址。 + # 原先的 print 函数和主线程的锁 _print = print mutex = threading.Lock() @@ -35,6 +46,9 @@ push_config = { 'BARK_ARCHIVE': '', # bark 推送是否存档 'BARK_GROUP': '', # bark 推送分组 'BARK_SOUND': '', # bark 推送声音 + 'BARK_ICON': '', # bark 推送图标 + 'BARK_LEVEL': '', # bark 推送时效性 + 'BARK_URL': '', # bark 推送跳转URL 'CONSOLE': True, # 控制台输出 @@ -44,11 +58,11 @@ push_config = { 'FSKEY': '', # 飞书机器人的 FSKEY 'GOBOT_URL': '', # go-cqhttp - # 推送到个人QQ:http://127.0.0.1/send_private_msg - # 群:http://127.0.0.1/send_group_msg + # 推送到个人QQ:http://127.0.0.1/send_private_msg + # 群:http://127.0.0.1/send_group_msg 'GOBOT_QQ': '', # go-cqhttp 的推送群或用户 - # GOBOT_URL 设置 /send_private_msg 时填入 user_id=个人QQ - # /send_group_msg 时填入 group_id=QQ群 + # GOBOT_URL 设置 /send_private_msg 时填入 user_id=个人QQ + # /send_group_msg 时填入 group_id=QQ群 'GOBOT_TOKEN': '', # go-cqhttp 的 access_token 'GOTIFY_URL': '', # gotify地址,如https://push.example.de:8080 @@ -59,12 +73,20 @@ push_config = { 'PUSH_KEY': '', # server 酱的 PUSH_KEY,兼容旧版与 Turbo 版 + 'DEER_KEY': '', # PushDeer 的 PUSHDEER_KEY + 'DEER_URL': '', # PushDeer 的 PUSHDEER_URL + + 'CHAT_URL': '', # synology chat url + 'CHAT_TOKEN': '', # synology chat token + 'PUSH_PLUS_TOKEN': '', # push+ 微信推送的用户令牌 'PUSH_PLUS_USER': '', # push+ 微信推送的群组编码 'QMSG_KEY': '', # qmsg 酱的 QMSG_KEY 'QMSG_TYPE': '', # qmsg 酱的 QMSG_TYPE + 'QYWX_ORIGIN': '', # 企业微信代理地址 + 'QYWX_AM': '', # 企业微信应用 'QYWX_KEY': '', # 企业微信机器人 @@ -75,6 +97,28 @@ push_config = { 'TG_PROXY_AUTH': '', # tg 代理认证参数 'TG_PROXY_HOST': '', # tg 机器人的 TG_PROXY_HOST 'TG_PROXY_PORT': '', # tg 机器人的 TG_PROXY_PORT + + 'AIBOTK_KEY': '', # 智能微秘书 个人中心的apikey 文档地址:http://wechat.aibotk.com/docs/about + 'AIBOTK_TYPE': '', # 智能微秘书 发送目标 room 或 contact + 'AIBOTK_NAME': '', # 智能微秘书 发送群名 或者好友昵称和type要对应好 + + 'SMTP_SERVER': '', # SMTP 发送邮件服务器,形如 smtp.exmail.qq.com:465 + 'SMTP_SSL': 'false', # SMTP 发送邮件服务器是否使用 SSL,填写 true 或 false + 'SMTP_EMAIL': '', # SMTP 收发件邮箱,通知将会由自己发给自己 + 'SMTP_PASSWORD': '', # SMTP 登录密码,也可能为特殊口令,视具体邮件服务商说明而定 + 'SMTP_NAME': '', # SMTP 收发件人姓名,可随意填写 + + 'PUSHME_KEY': '', # PushMe 酱的 PUSHME_KEY + + 'CHRONOCAT_QQ': '', # qq号 + 'CHRONOCAT_TOKEN': '', # CHRONOCAT 的token + 'CHRONOCAT_URL': '', # CHRONOCAT的url地址 + + 'WEBHOOK_URL': '', # 自定义通知 请求地址 + 'WEBHOOK_BODY': '', # 自定义通知 请求体 + 'WEBHOOK_HEADERS': '', # 自定义通知 请求头 + 'WEBHOOK_METHOD': '', # 自定义通知 请求方法 + 'WEBHOOK_CONTENT_TYPE': '' # 自定义通知 content-type } notify_function = [] # fmt: on @@ -104,14 +148,17 @@ def bark(title: str, content: str) -> None: "BARK_ARCHIVE": "isArchive", "BARK_GROUP": "group", "BARK_SOUND": "sound", + "BARK_ICON": "icon", + "BARK_LEVEL": "level", + "BARK_URL": "url", } params = "" for pair in filter( - lambda pairs: pairs[0].startswith("BARK_") - and pairs[0] != "BARK_PUSH" - and pairs[1] - and bark_params.get(pairs[0]), - push_config.items(), + lambda pairs: pairs[0].startswith("BARK_") + and pairs[0] != "BARK_PUSH" + and pairs[1] + and bark_params.get(pairs[0]), + push_config.items(), ): params += f"{bark_params.get(pair[0])}={pair[1]}&" if params: @@ -142,8 +189,7 @@ def dingding_bot(title: str, content: str) -> None: timestamp = str(round(time.time() * 1000)) secret_enc = push_config.get("DD_BOT_SECRET").encode("utf-8") - string_to_sign = "{}\n{}".format( - timestamp, push_config.get("DD_BOT_SECRET")) + string_to_sign = "{}\n{}".format(timestamp, push_config.get("DD_BOT_SECRET")) string_to_sign_enc = string_to_sign.encode("utf-8") hmac_code = hmac.new( secret_enc, string_to_sign_enc, digestmod=hashlib.sha256 @@ -209,8 +255,11 @@ def gotify(title: str, content: str) -> None: print("gotify 服务启动") url = f'{push_config.get("GOTIFY_URL")}/message?token={push_config.get("GOTIFY_TOKEN")}' - data = {"title": title, "message": content, - "priority": push_config.get("GOTIFY_PRIORITY")} + data = { + "title": title, + "message": content, + "priority": push_config.get("GOTIFY_PRIORITY"), + } response = requests.post(url, data=data).json() if response.get("id"): @@ -249,10 +298,10 @@ def serverJ(title: str, content: str) -> None: print("serverJ 服务启动") data = {"text": title, "desp": content.replace("\n", "\n\n")} - if push_config.get("PUSH_KEY").index("SCT") != -1: + if push_config.get("PUSH_KEY").find("SCT") != -1: url = f'https://sctapi.ftqq.com/{push_config.get("PUSH_KEY")}.send' else: - url = f'https://sc.ftqq.com/${push_config.get("PUSH_KEY")}.send' + url = f'https://sc.ftqq.com/{push_config.get("PUSH_KEY")}.send' response = requests.post(url, data=data).json() if response.get("errno") == 0 or response.get("code") == 0: @@ -261,6 +310,50 @@ def serverJ(title: str, content: str) -> None: print(f'serverJ 推送失败!错误码:{response["message"]}') +def pushdeer(title: str, content: str) -> None: + """ + 通过PushDeer 推送消息 + """ + if not push_config.get("DEER_KEY"): + print("PushDeer 服务的 DEER_KEY 未设置!!\n取消推送") + return + print("PushDeer 服务启动") + data = { + "text": title, + "desp": content, + "type": "markdown", + "pushkey": push_config.get("DEER_KEY"), + } + url = "https://api2.pushdeer.com/message/push" + if push_config.get("DEER_URL"): + url = push_config.get("DEER_URL") + + response = requests.post(url, data=data).json() + + if len(response.get("content").get("result")) > 0: + print("PushDeer 推送成功!") + else: + print("PushDeer 推送失败!错误信息:", response) + + +def chat(title: str, content: str) -> None: + """ + 通过Chat 推送消息 + """ + if not push_config.get("CHAT_URL") or not push_config.get("CHAT_TOKEN"): + print("chat 服务的 CHAT_URL或CHAT_TOKEN 未设置!!\n取消推送") + return + print("chat 服务启动") + data = "payload=" + json.dumps({"text": title + "\n" + content}) + url = push_config.get("CHAT_URL") + push_config.get("CHAT_TOKEN") + response = requests.post(url, data=data) + + if response.status_code == 200: + print("Chat 推送成功!") + else: + print("Chat 推送失败!错误信息:", response) + + def pushplus_bot(title: str, content: str) -> None: """ 通过 push+ 推送消息。 @@ -285,11 +378,9 @@ def pushplus_bot(title: str, content: str) -> None: print("PUSHPLUS 推送成功!") else: - url_old = "http://pushplus.hxtrip.com/send" headers["Accept"] = "application/json" - response = requests.post( - url=url_old, data=body, headers=headers).json() + response = requests.post(url=url_old, data=body, headers=headers).json() if response["code"] == 200: print("PUSHPLUS(hxtrip) 推送成功!") @@ -308,8 +399,7 @@ def qmsg_bot(title: str, content: str) -> None: print("qmsg 服务启动") url = f'https://qmsg.zendee.cn/{push_config.get("QMSG_TYPE")}/{push_config.get("QMSG_KEY")}' - payload = { - "msg": f'{title}\n\n{content.replace("----", "-")}'.encode("utf-8")} + payload = {"msg": f'{title}\n\n{content.replace("----", "-")}'.encode("utf-8")} response = requests.post(url=url, params=payload).json() if response["code"] == 0: @@ -358,9 +448,12 @@ class WeCom: self.CORPID = corpid self.CORPSECRET = corpsecret self.AGENTID = agentid + self.ORIGIN = "https://qyapi.weixin.qq.com" + if push_config.get("QYWX_ORIGIN"): + self.ORIGIN = push_config.get("QYWX_ORIGIN") def get_access_token(self): - url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken" + url = f"{self.ORIGIN}/cgi-bin/gettoken" values = { "corpid": self.CORPID, "corpsecret": self.CORPSECRET, @@ -371,8 +464,7 @@ class WeCom: def send_text(self, message, touser="@all"): send_url = ( - "https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=" - + self.get_access_token() + f"{self.ORIGIN}/cgi-bin/message/send?access_token={self.get_access_token()}" ) send_values = { "touser": touser, @@ -388,8 +480,7 @@ class WeCom: def send_mpnews(self, title, message, media_id, touser="@all"): send_url = ( - "https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=" - + self.get_access_token() + f"{self.ORIGIN}/cgi-bin/message/send?access_token={self.get_access_token()}" ) send_values = { "touser": touser, @@ -423,7 +514,11 @@ def wecom_bot(title: str, content: str) -> None: return print("企业微信机器人服务启动") - url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={push_config.get('QYWX_KEY')}" + origin = "https://qyapi.weixin.qq.com" + if push_config.get("QYWX_ORIGIN"): + origin = push_config.get("QYWX_ORIGIN") + + url = f"{origin}/cgi-bin/webhook/send?key={push_config.get('QYWX_KEY')}" headers = {"Content-Type": "application/json;charset=utf-8"} data = {"msgtype": "text", "text": {"content": f"{title}\n\n{content}"}} response = requests.post( @@ -460,12 +555,12 @@ def telegram_bot(title: str, content: str) -> None: proxies = None if push_config.get("TG_PROXY_HOST") and push_config.get("TG_PROXY_PORT"): if push_config.get("TG_PROXY_AUTH") is not None and "@" not in push_config.get( - "TG_PROXY_HOST" + "TG_PROXY_HOST" ): push_config["TG_PROXY_HOST"] = ( - push_config.get("TG_PROXY_AUTH") - + "@" - + push_config.get("TG_PROXY_HOST") + push_config.get("TG_PROXY_AUTH") + + "@" + + push_config.get("TG_PROXY_HOST") ) proxyStr = "http://{}:{}".format( push_config.get("TG_PROXY_HOST"), push_config.get("TG_PROXY_PORT") @@ -481,6 +576,268 @@ def telegram_bot(title: str, content: str) -> None: print("tg 推送失败!") +def aibotk(title: str, content: str) -> None: + """ + 使用 智能微秘书 推送消息。 + """ + if ( + not push_config.get("AIBOTK_KEY") + or not push_config.get("AIBOTK_TYPE") + or not push_config.get("AIBOTK_NAME") + ): + print("智能微秘书 的 AIBOTK_KEY 或者 AIBOTK_TYPE 或者 AIBOTK_NAME 未设置!!\n取消推送") + return + print("智能微秘书 服务启动") + + if push_config.get("AIBOTK_TYPE") == "room": + url = "https://api-bot.aibotk.com/openapi/v1/chat/room" + data = { + "apiKey": push_config.get("AIBOTK_KEY"), + "roomName": push_config.get("AIBOTK_NAME"), + "message": {"type": 1, "content": f"【青龙快讯】\n\n{title}\n{content}"}, + } + else: + url = "https://api-bot.aibotk.com/openapi/v1/chat/contact" + data = { + "apiKey": push_config.get("AIBOTK_KEY"), + "name": push_config.get("AIBOTK_NAME"), + "message": {"type": 1, "content": f"【青龙快讯】\n\n{title}\n{content}"}, + } + body = json.dumps(data).encode(encoding="utf-8") + headers = {"Content-Type": "application/json"} + response = requests.post(url=url, data=body, headers=headers).json() + print(response) + if response["code"] == 0: + print("智能微秘书 推送成功!") + else: + print(f'智能微秘书 推送失败!{response["error"]}') + + +def smtp(title: str, content: str) -> None: + """ + 使用 SMTP 邮件 推送消息。 + """ + if ( + not push_config.get("SMTP_SERVER") + or not push_config.get("SMTP_SSL") + or not push_config.get("SMTP_EMAIL") + or not push_config.get("SMTP_PASSWORD") + or not push_config.get("SMTP_NAME") + ): + print( + "SMTP 邮件 的 SMTP_SERVER 或者 SMTP_SSL 或者 SMTP_EMAIL 或者 SMTP_PASSWORD 或者 SMTP_NAME 未设置!!\n取消推送" + ) + return + print("SMTP 邮件 服务启动") + + message = MIMEText(content, "plain", "utf-8") + message["From"] = formataddr( + ( + Header(push_config.get("SMTP_NAME"), "utf-8").encode(), + push_config.get("SMTP_EMAIL"), + ) + ) + message["To"] = formataddr( + ( + Header(push_config.get("SMTP_NAME"), "utf-8").encode(), + push_config.get("SMTP_EMAIL"), + ) + ) + message["Subject"] = Header(title, "utf-8") + + try: + smtp_server = ( + smtplib.SMTP_SSL(push_config.get("SMTP_SERVER")) + if push_config.get("SMTP_SSL") == "true" + else smtplib.SMTP(push_config.get("SMTP_SERVER")) + ) + smtp_server.login( + push_config.get("SMTP_EMAIL"), push_config.get("SMTP_PASSWORD") + ) + smtp_server.sendmail( + push_config.get("SMTP_EMAIL"), + push_config.get("SMTP_EMAIL"), + message.as_bytes(), + ) + smtp_server.close() + print("SMTP 邮件 推送成功!") + except Exception as e: + print(f"SMTP 邮件 推送失败!{e}") + + +def pushme(title: str, content: str) -> None: + """ + 使用 PushMe 推送消息。 + """ + if not push_config.get("PUSHME_KEY"): + print("PushMe 服务的 PUSHME_KEY 未设置!!\n取消推送") + return + print("PushMe 服务启动") + + url = f'https://push.i-i.me/?push_key={push_config.get("PUSHME_KEY")}' + data = { + "title": title, + "content": content, + } + response = requests.post(url, data=data) + + if response.status_code == 200 and response.text == "success": + print("PushMe 推送成功!") + else: + print(f"PushMe 推送失败!{response.status_code} {response.text}") + + +def chronocat(title: str, content: str) -> None: + """ + 使用 CHRONOCAT 推送消息。 + """ + if ( + not push_config.get("CHRONOCAT_URL") + or not push_config.get("CHRONOCAT_QQ") + or not push_config.get("CHRONOCAT_TOKEN") + ): + print("CHRONOCAT 服务的 CHRONOCAT_URL 或 CHRONOCAT_QQ 未设置!!\n取消推送") + return + + print("CHRONOCAT 服务启动") + + user_ids = re.findall(r"user_id=(\d+)", push_config.get("CHRONOCAT_QQ")) + group_ids = re.findall(r"group_id=(\d+)", push_config.get("CHRONOCAT_QQ")) + + url = f'{push_config.get("CHRONOCAT_URL")}/api/message/send' + headers = { + "Content-Type": "application/json", + "Authorization": f'Bearer {push_config.get("CHRONOCAT_TOKEN")}', + } + + for chat_type, ids in [(1, user_ids), (2, group_ids)]: + if not ids: + continue + for chat_id in ids: + data = { + "peer": {"chatType": chat_type, "peerUin": chat_id}, + "elements": [ + { + "elementType": 1, + "textElement": {"content": f"{title}\n\n{content}"}, + } + ], + } + response = requests.post(url, headers=headers, data=json.dumps(data)) + if response.status_code == 200: + if chat_type == 1: + print(f"QQ个人消息:{ids}推送成功!") + else: + print(f"QQ群消息:{ids}推送成功!") + else: + if chat_type == 1: + print(f"QQ个人消息:{ids}推送失败!") + else: + print(f"QQ群消息:{ids}推送失败!") + + +def parse_headers(headers): + if not headers: + return {} + + parsed = {} + lines = headers.split("\n") + + for line in lines: + i = line.find(":") + if i == -1: + continue + + key = line[:i].strip().lower() + val = line[i + 1 :].strip() + parsed[key] = parsed.get(key, "") + ", " + val if key in parsed else val + + return parsed + + +def parse_body(body, content_type): + if not body: + return "" + + parsed = {} + lines = body.split("\n") + + for line in lines: + i = line.find(":") + if i == -1: + continue + + key = line[:i].strip().lower() + val = line[i + 1 :].strip() + + if not key or key in parsed: + continue + + try: + json_value = json.loads(val) + parsed[key] = json_value + except: + parsed[key] = val + + if content_type == "application/x-www-form-urlencoded": + data = urlencode(parsed, doseq=True) + return data + + if content_type == "application/json": + data = json.dumps(parsed) + return data + + return parsed + + +def format_notify_content(url, body, title, content): + if "$title" not in url and "$title" not in body: + return {} + + formatted_url = url.replace("$title", urllib.parse.quote_plus(title)).replace( + "$content", urllib.parse.quote_plus(content) + ) + formatted_body = body.replace("$title", title).replace("$content", content) + + return formatted_url, formatted_body + + +def custom_notify(title: str, content: str) -> None: + """ + 通过 自定义通知 推送消息。 + """ + if not push_config.get("WEBHOOK_URL") or not push_config.get("WEBHOOK_METHOD"): + print("自定义通知的 WEBHOOK_URL 或 WEBHOOK_METHOD 未设置!!\n取消推送") + return + + print("自定义通知服务启动") + + WEBHOOK_URL = push_config.get("WEBHOOK_URL") + WEBHOOK_METHOD = push_config.get("WEBHOOK_METHOD") + WEBHOOK_CONTENT_TYPE = push_config.get("WEBHOOK_CONTENT_TYPE") + WEBHOOK_BODY = push_config.get("WEBHOOK_BODY") + WEBHOOK_HEADERS = push_config.get("WEBHOOK_HEADERS") + + formatUrl, formatBody = format_notify_content( + WEBHOOK_URL, WEBHOOK_BODY, title, content + ) + + if not formatUrl and not formatBody: + print("请求头或者请求体中必须包含 $title 和 $content") + return + + headers = parse_headers(WEBHOOK_HEADERS) + body = parse_body(formatBody, WEBHOOK_CONTENT_TYPE) + response = requests.request( + method=WEBHOOK_METHOD, url=formatUrl, headers=headers, timeout=15, data=body + ) + + if response.status_code == 200: + print("自定义通知推送成功!") + else: + print(f"自定义通知推送失败!{response.status_code} {response.text}") + + def one() -> str: """ 获取一条一言。 @@ -507,6 +864,10 @@ if push_config.get("IGOT_PUSH_KEY"): notify_function.append(iGot) if push_config.get("PUSH_KEY"): notify_function.append(serverJ) +if push_config.get("DEER_KEY"): + notify_function.append(pushdeer) +if push_config.get("CHAT_URL") and push_config.get("CHAT_TOKEN"): + notify_function.append(chat) if push_config.get("PUSH_PLUS_TOKEN"): notify_function.append(pushplus_bot) if push_config.get("QMSG_KEY") and push_config.get("QMSG_TYPE"): @@ -517,6 +878,30 @@ if push_config.get("QYWX_KEY"): notify_function.append(wecom_bot) if push_config.get("TG_BOT_TOKEN") and push_config.get("TG_USER_ID"): notify_function.append(telegram_bot) +if ( + push_config.get("AIBOTK_KEY") + and push_config.get("AIBOTK_TYPE") + and push_config.get("AIBOTK_NAME") +): + notify_function.append(aibotk) +if ( + push_config.get("SMTP_SERVER") + and push_config.get("SMTP_SSL") + and push_config.get("SMTP_EMAIL") + and push_config.get("SMTP_PASSWORD") + and push_config.get("SMTP_NAME") +): + notify_function.append(smtp) +if push_config.get("PUSHME_KEY"): + notify_function.append(pushme) +if ( + push_config.get("CHRONOCAT_URL") + and push_config.get("CHRONOCAT_QQ") + and push_config.get("CHRONOCAT_TOKEN") +): + notify_function.append(chronocat) +if push_config.get("WEBHOOK_URL") and push_config.get("WEBHOOK_METHOD"): + notify_function.append(custom_notify) def send(title: str, content: str) -> None: @@ -524,19 +909,43 @@ def send(title: str, content: str) -> None: print(f"{title} 推送内容为空!") return + # 根据标题跳过一些消息推送,环境变量:SKIP_PUSH_TITLE 用回车分隔 + skipTitle = os.getenv("SKIP_PUSH_TITLE") + if skipTitle: + if title in re.split("\n", skipTitle): + print(f"{title} 在SKIP_PUSH_TITLE环境变量内,跳过推送!") + return + hitokoto = push_config.get("HITOKOTO") text = one() if hitokoto else "" content += "\n\n" + text ts = [ - threading.Thread(target=mode, args=( - title, content), name=mode.__name__) + threading.Thread(target=mode, args=(title, content), name=mode.__name__) for mode in notify_function ] [t.start() for t in ts] [t.join() for t in ts] +def sendEmail(subject, content): + msg = MIMEText(content, 'plain', 'utf-8') + msg['Subject'] = subject + msg['From'] = msg_from + msg['To'] = msg_to + try: + client = smtplib.SMTP_SSL('smtp.qq.com', smtplib.SMTP_SSL_PORT) + print("连接到邮件服务器成功") + + client.login(msg_from, password) + print("登录成功") + + client.sendmail(msg_from, msg_to, msg.as_string()) + print("发送成功") + except smtplib.SMTPException as e: + print("发送邮件异常") + finally: + client.quit() def main(): send("title", "content")