Automating Slack Channel Creation and Message Import

1. Export Channel Content: This requires workspace admin privileges:

2. Create an API Token and Add OAuth Scopes:

channels:history
groups:history
im:history
mpim:history
channels:write
groups:write
chat:write
users:read
channels:read
groups:read
im:write
im:read

3. Automating Slack Channel Creation and Message Import Script:

import json
import os
import requests
import time

# Slack API Token
token = ''

# URL for creating a new channel
url_create = 'https://slack.com/api/conversations.create'
headers = {
    'Content-Type': 'application/json',  # Set the content type to JSON
    'Authorization': f'Bearer {token}'  # Use Bearer token for authorization
}
payload_create = {
    'name': 'new-channel-name'  # Specify the new channel name
}

# Send request to create a new channel
response_create = requests.post(url_create, json=payload_create, headers=headers)
if response_create.json().get('ok'):
    new_channel_id = response_create.json()['channel']['id']
    print(f"New channel created with ID: {new_channel_id}")  # Print the new channel ID
else:
    print('Failed to create channel:', response_create.json())  # Print error message if channel creation fails
    exit()

# Get current user information to exclude self
url_auth_test = 'https://slack.com/api/auth.test'
response_auth_test = requests.post(url_auth_test, headers=headers)
if response_auth_test.json().get('ok'):
    current_user_id = response_auth_test.json()['user_id']
else:
    print('Failed to get current user info:', response_auth_test.json())
    exit()


# Define a function to read and send messages
def read_and_send_messages(directory, channel_id, token):
    headers = {
        'Content-Type': 'application/json',  # Set the content type to JSON
        'Authorization': f'Bearer {token}'  # Use Bearer token for authorization
    }
    url_post_message = 'https://slack.com/api/chat.postMessage'  # URL for posting messages
    url_invite = 'https://slack.com/api/conversations.invite'  # URL for inviting members

    members = set()  # Create a set to store member IDs

    # Iterate through all JSON files in the specified directory
    for filename in os.listdir(directory):
        if filename.endswith('.json'):
            filepath = os.path.join(directory, filename)
            with open(filepath, 'r', encoding='utf-8') as file:
                data = json.load(file)  # Load the data from the JSON file
                for message in data:
                    # Collect member IDs
                    if 'user' in message and message['user'] != current_user_id and message['user'] != 'USLACKBOT':
                        members.add(message['user'])  # Add user ID to the set of members

    # Invite members to the new channel
    for member in members:
        payload_invite = {
            'channel': channel_id,
            'users': member
        }
        response_invite = requests.post(url_invite, json=payload_invite, headers=headers)
        if not response_invite.json().get('ok'):
            print(f"Failed to invite member {member}: {response_invite.json().get('error')}")  # Print error message if member invitation fails
        else:
            print(f"Successfully invited member {member}")  # Print success message if member invitation succeeds
        time.sleep(1)  # Prevent hitting API rate limits

    # Send messages
    for filename in os.listdir(directory):
        if filename.endswith('.json'):
            filepath = os.path.join(directory, filename)
            with open(filepath, 'r', encoding='utf-8') as file:
                data = json.load(file)  # Load the data from the JSON file
                for message in data:
                    # Check if the message object contains 'text' field
                    if 'text' in message:
                        payload_message = {
                            'channel': channel_id,
                            'text': message['text'],  # Assume each message object has a 'text' field
                        }
                        response_message = requests.post(url_post_message, json=payload_message, headers=headers)
                        if not response_message.json().get('ok'):
                            print(f"Failed to send message from {filename}: {response_message.json().get('error')}")  # Print error message if message sending fails
                    else:
                        print(f"Message in {filename} does not contain 'text' field.")  # Print message if 'text' field is missing

                    time.sleep(1)  # Prevent hitting API rate limits

    print('Data import and member invitation completed.')  # Print message indicating data import and member invitation is complete


# Specify the directory path containing multiple JSON files
directory = r'C:\slackBackup\xxx Slack export Mar 13 2024 - Jun 7 2024\channel-name'

# Call the function with the file path
read_and_send_messages(directory, new_channel_id, token)

留下评论

通过 WordPress.com 设计一个这样的站点
从这里开始