钉钉API下载审批附件+评论图片

钉钉API下载审批附件+评论图片

没有Python基础写出来的,见笑了

在next_token(分页)和下载评论图片/审批附件类型,这块比较耗费时间。

保存的文件名是:{审批编号}-图片/附件-{序号}

仅使用 企业内部应用开发 获取审批实例ID列表 - 钉钉开放平台

可以直接在影刀中 -> 调用模块,然后写入传入参数即可。注意:list类型请勾选python模式。

更新日记:

2024.12.20 更新文件名称类型为对应字典,更新飞书通知,更新返回值.to_map(),更新获取前一天到前31天的工单,然后只取昨天审批结束的。

import requests

import sys

import json

import os

import datetime

from datetime import datetime

from datetime import datetime, timedelta

from typing import List

from alibabacloud_dingtalk.workflow_1_0.client import Client as dingtalkworkflow_1_0Client

from alibabacloud_tea_openapi import models as open_api_models

from alibabacloud_dingtalk.workflow_1_0 import models as dingtalkworkflow__1__0_models

from alibabacloud_tea_util import models as util_models

from alibabacloud_tea_util.client import Client as UtilClient

def get_dingtalk_access_token():

# 钉钉的认证接口 URL

url = 'https://oapi.dingtalk.com/gettoken'

# 请求参数

params = {

'appkey': 'XXX', # 替换为你的钉钉应用的 AppKey

'appsecret': 'XXX' # 替换为你的钉钉应用的 AppSecret

}

# 请求头

headers = {

'Content-Type': 'application/json'

}

# 发送 GET 请求

response = requests.get(url, headers=headers, params=params)

# 检查响应状态码

if response.status_code == 200:

# 解析响应内容

data = response.json()

if 'access_token' in data:

return data['access_token']

else:

print("获取 access_token 失败:", data.get('errmsg', '未知错误'))

return None

else:

print('请求失败,状态码:', response.status_code)

return None

# 使用函数获取 access token

access_token = get_dingtalk_access_token()

# if access_token:

# print("获取的 access token:", access_token)

class Sample:

def __init__(self):

pass

@staticmethod

def create_client() -> dingtalkworkflow_1_0Client:

"""

使用 Token 初始化账号Client

@return: Client

@throws Exception

"""

config = open_api_models.Config()

config.protocol = 'https'

config.region_id = 'central'

return dingtalkworkflow_1_0Client(config)

@staticmethod

def main(

process_code: str,

start_time: int,

end_time: int,

statuses: List[str],

user_ids: List[str],

download_path: str,

next_token: str = 0,

all_instances: List[dict] = None,

) -> str:

all_instances = []

client = Sample.create_client()

#审批列表导出

while True:

list_process_instance_ids_headers = dingtalkworkflow__1__0_models.ListProcessInstanceIdsHeaders()

list_process_instance_ids_headers.x_acs_dingtalk_access_token = access_token

list_process_instance_ids_request = dingtalkworkflow__1__0_models.ListProcessInstanceIdsRequest(

process_code=process_code,

start_time=start_time,

end_time=end_time,

next_token=next_token,

max_results=10,

user_ids=user_ids,

statuses=statuses

)

try:

response = client.list_process_instance_ids_with_options(list_process_instance_ids_request, list_process_instance_ids_headers, util_models.RuntimeOptions())

instances = response.body.result.list # 获取当前页的实例ID列表

next_token = response.body.result.next_token # 获取下一个查询的next_token

all_instances.extend(instances) # 将当前页的实例ID列表添加到累积列表中

if not next_token:

break

except Exception as err:

if not UtilClient.empty(err.code) and not UtilClient.empty(err.message):

print(err.code, err.message)

#循环导出的审批列表

for instance in all_instances:

get_process_instance_headers = dingtalkworkflow__1__0_models.GetProcessInstanceHeaders()

get_process_instance_headers.x_acs_dingtalk_access_token = access_token

get_process_instance_request = dingtalkworkflow__1__0_models.GetProcessInstanceRequest(

process_instance_id=instance

)

try:

repon = client.get_process_instance_with_options(get_process_instance_request, get_process_instance_headers, util_models.RuntimeOptions())

json_data = repon.body.result.to_map()

instance_time = json_data['finishTime']

given_date = datetime.strptime(instance_time.split('T')[0], "%Y-%m-%d").date()

today = datetime.now().date()

# 判断给定日期是否是昨天

is_yesterday = (today - given_date).days == 1

if is_yesterday is False:

continue

form_component_values = json_data['businessId']

#下载图片

images_list = []

for record in json_data['operationRecords']:

if record['images']:

images_list.extend(record['images'])

for index,images_url in enumerate(images_list):

index2 = index + 1

save_dir = download_path

file_name_with_ext = os.path.basename(images_url)

file_name, file_ext = os.path.splitext(file_name_with_ext)

images = f"{form_component_values}-图片-{index2}{file_ext}"

save_path = os.path.join(save_dir, images)

with requests.get(images_url, stream=True) as r:

r.raise_for_status() # 如果请求出错,这里会抛出HTTPError异常

with open(save_path, 'wb') as f:

for chunk in r.iter_content(chunk_size=8192):

f.write(chunk)

# 将fileId和fileType组成字典

file_dict = {}

for component in json_data['formComponentValues']:

value_str = component.get('value', '')

try:

value_data = json.loads(value_str)

if isinstance(value_data, list):

for item in value_data:

if isinstance(item, dict):

file_id = item.get('fileId')

file_type = item.get('fileType')

if file_id and file_type:

file_dict[file_id] = file_type

else:

# print(f"Warning: An element in value_data is not a dict, but a {type(item)}: {item}")

pass

except json.JSONDecodeError:

# print(f"'value' 字段的值不是一个 JSON 字符串: {value_str}")

pass

#根据字典下载文件.类型

for index,(file_id,file_type) in enumerate(file_dict.items()):

index2 = index + 1

grant_process_instance_for_download_file_headers = dingtalkworkflow__1__0_models.GrantProcessInstanceForDownloadFileHeaders()

grant_process_instance_for_download_file_headers.x_acs_dingtalk_access_token = access_token

grant_process_instance_for_download_file_request = dingtalkworkflow__1__0_models.GrantProcessInstanceForDownloadFileRequest(

process_instance_id=instance,

file_id=file_id

)

try:

respons = client.grant_process_instance_for_download_file_with_options(grant_process_instance_for_download_file_request, grant_process_instance_for_download_file_headers, util_models.RuntimeOptions())

file_url = respons.body.result.download_uri

save_dir = download_path

file_name = f"{form_component_values}-附件-{index2}.{file_type}"

save_path = os.path.join(save_dir, file_name)

with requests.get(file_url, stream=True) as r:

r.raise_for_status()

with open(save_path, 'wb') as f:

for chunk in r.iter_content(chunk_size=8192):

f.write(chunk)

except Exception as err:

if not UtilClient.empty(err.code) and not UtilClient.empty(err.message):

print(err.code, err.message)

except Exception as err:

if not UtilClient.empty(err.code) and not UtilClient.empty(err.message):

print(f"error code: {err.code}, error message: {err.message}")

# 发送飞书通知

webhook_url = 'https://open.feishu.cn/open-apis/bot/v2/hook/your-webhook-url'

list_num = len(all_instances)

message = f"钉钉-付款流程附件保存,今日共保存:{list_num}个付款流程信息。"

headers = {

'Content-Type': 'application/json'

}

payload = {

"msg_type": "text",

"content": {

"text": message

}

}

response = requests.post(webhook_url, headers=headers, data=json.dumps(payload))

return len(all_instances)

#启动函数

def start(process_code, statuses, user_ids, download_path):

now = datetime.now()

thirty_days_ago_start_of_day = now.replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=31)

start_time = int(thirty_days_ago_start_of_day.timestamp() * 1000)

yesterday = now - timedelta(days=1)

yesterday_end_of_day = yesterday.replace(hour=23, minute=59, second=59, microsecond=999999)

end_time = int(yesterday_end_of_day.timestamp() * 1000)

# 调用main函数

return Sample.main(process_code, start_time, end_time, statuses, user_ids, download_path)

if __name__ == '__main__':

process_code = "PROC-XXX"

statuses = ['COMPLETED']

user_ids = []

access_token = access_token

download_path = ".\download"

start(process_code, statuses, user_ids, download_path)

完成!

相关推荐