vk-toot/vk_toot.py

225 lines
7.8 KiB
Python
Raw Normal View History

2022-07-07 09:01:14 +03:00
__version__ = '0.2.1'
2022-05-21 02:27:14 +03:00
import os
2022-07-07 09:01:14 +03:00
import sys
2022-05-21 02:27:14 +03:00
import json
import time
2022-07-07 09:01:14 +03:00
import datetime
import logging
2022-06-07 18:17:58 +03:00
import shutil
2022-07-07 09:01:14 +03:00
import urllib.parse
2022-06-07 18:17:58 +03:00
2022-05-21 02:27:14 +03:00
import requests
2022-08-28 16:53:36 +03:00
import toml
2022-05-21 02:27:14 +03:00
2022-08-28 16:53:36 +03:00
with open('./data/config.toml', 'r') as file:
config = toml.loads(file.read())
MASTODON_API_URL = config['mastodon']['API_URL']
MASTODON_API_ACCESS_TOKEN = config['mastodon']['API_ACCESS_TOKEN']
VK_API_URL = config['vk']['API_URL']
VK_API_VERSION = config['vk']['API_VERSION']
VK_API_ACCESS_TOKEN = config['vk']['API_ACCESS_TOKEN']
VK_GROUP_DOMAIN = config['vk']['GROUP_DOMAIN']
2022-05-21 02:27:14 +03:00
2022-07-07 09:01:14 +03:00
# Set up logger
2022-08-28 16:53:36 +03:00
log = logging.getLogger('vk_toot')
log.setLevel(logging.INFO)
2022-07-07 09:01:14 +03:00
handler = logging.StreamHandler(stream=sys.stdout)
handler.setFormatter(logging.Formatter(fmt = '[%(asctime)s: %(levelname)s] %(message)s'))
2022-08-28 16:53:36 +03:00
log.addHandler(handler)
2022-07-07 09:01:14 +03:00
2022-05-21 02:27:14 +03:00
def get_vk_group_last_post():
"""Return dict with VK group last post data."""
return json.loads(requests.get(VK_API_URL + '/wall.get' \
+ '?v=' + VK_API_VERSION \
+ '&access_token=' + VK_API_ACCESS_TOKEN \
+ '&domain=' + VK_GROUP_DOMAIN \
+ '&count=1').text)
def get_vk_post_text(post_data: dict) -> str:
"""See: https://dev.vk.com/reference/objects/post"""
return post_data['response']['items'][0]['text']
def get_vk_post_url(post_data: dict) -> str:
"""Return link to original post on vk.com
See: https://dev.vk.com/reference/objects/post
"""
wall_id = str(post_data['response']['items'][0]['owner_id'])
post_id = str(post_data['response']['items'][0]['id'])
return 'https://vk.com/wall' + wall_id + '_' + post_id
def get_vk_post_attachments(post_data: dict) -> list:
"""Process attachments. See attachments at
https://dev.vk.com/method/wall.post
2022-06-07 18:17:58 +03:00
Return list of dicts with following structure::
2022-05-21 02:27:14 +03:00
[{'photo': 'url'}, {'album': 'url'}]
"""
attachments = []
raw_attachments = post_data['response']['items'][0]['attachments']
for attachment in raw_attachments:
if attachment['type'] == 'photo':
# Get photo in max size by height (photos are proportionally resized)
height = [ photo['height'] for photo in attachment['photo']['sizes'] ]
for photo in attachment['photo']['sizes']:
if photo['height'] == max(height):
photo_url = photo['url']
attachments.append({'photo': photo_url})
elif attachment['type'] == 'video':
pass
elif attachment['type'] == 'audio':
pass
elif attachment['type'] == 'doc':
pass
elif attachment['type'] == 'page':
pass
elif attachment['type'] == 'note':
pass
elif attachment['type'] == 'pull':
pass
elif attachment['type'] == 'album':
owner_id = str(attachment['album']['owner_id'])
id = str(attachment['album']['id'])
album_url = 'https://vk.com/album' + owner_id + '_' + id
attachments.append({'album': album_url})
elif attachment['type'] == 'market':
pass
elif attachment['type'] == 'market_album':
pass
elif attachment['type'] == 'audio_playlist':
pass
return attachments
2022-06-07 18:17:58 +03:00
def download_file(url: str) -> str:
"""Save file to /tmp. Return file path"""
2022-07-07 09:01:14 +03:00
filename = '/tmp/' + os.path.basename(urllib.parse.urlparse(url).path)
2022-06-07 18:17:58 +03:00
response = requests.get(url, stream=True)
with open(filename, 'wb') as out_file:
shutil.copyfileobj(response.raw, out_file)
del response
return filename
def post_media(file: str) -> str:
"""Upload media file to Mastodon."""
headers = {'Authorization': 'Bearer ' + MASTODON_API_ACCESS_TOKEN}
files = {'file': open(file,'rb')}
response = requests.post(MASTODON_API_URL + '/media', files=files, headers=headers)
2022-08-28 16:53:36 +03:00
log.info('Post media on Mastodon. Response: ' \
2022-07-07 09:01:14 +03:00
+ str(response.status_code) + ' ' + str(response.text))
# Sleep some seconds to prevent HTTP 429 response.
try:
request_delay = int(config['bridge']['REQUEST_DELAY'])
2022-07-07 09:01:14 +03:00
except (KeyError, TypeError):
request_delay = 1
time.sleep(request_delay)
2022-06-07 18:17:58 +03:00
return response
def publish_toot(post_text: str, media_ids: list):
"""Post toot on Mastodon.
See: https://docs.joinmastodon.org/methods/statuses/
"""
headers = {'Authorization': 'Bearer ' + MASTODON_API_ACCESS_TOKEN}
params = {'status': post_text, 'media_ids[]': media_ids}
response = requests.post(MASTODON_API_URL + '/statuses', data=params, headers=headers)
2022-08-28 16:53:36 +03:00
log.info('Publish toot. Response: %s' % response.text)
2022-07-07 09:01:14 +03:00
post_url = json.loads(response.text)['url']
2022-08-28 16:53:36 +03:00
log.info('Status: ' \
+ str(response.status_code) + ' URL: ' + str(post_url))
2022-06-07 18:17:58 +03:00
return response
2022-08-28 16:53:36 +03:00
def toot(post_data: dict) -> str:
2022-06-07 18:17:58 +03:00
"""Upload media files, generate status text for Mastodon post and publish.
"""
2022-05-21 02:27:14 +03:00
post_text = get_vk_post_text(post_data)
vk_post_url = get_vk_post_url(post_data)
attachments = get_vk_post_attachments(post_data)
2022-06-07 18:17:58 +03:00
# Upload attachments!
# Upload only first 4 photos and get media_ids
i = 0
media_ids = []
2022-08-28 16:53:36 +03:00
log.info('Attachments: %s' % str(attachments))
2022-07-07 09:01:14 +03:00
attachments_count = len(attachments)
if attachments_count > 4:
attachments_count = 4
while i < attachments_count:
2022-06-07 18:17:58 +03:00
if list(attachments[i].keys())[0] == 'photo':
photo_url = attachments[i]['photo']
2022-08-28 16:53:36 +03:00
log.info('Download image: %s' % photo_url)
2022-06-07 18:17:58 +03:00
tmpfile = download_file(photo_url) # Download file from VK
2022-08-28 16:53:36 +03:00
log.info('Image saved locally as: %s' % tmpfile)
2022-06-07 18:17:58 +03:00
media_id = json.loads(post_media(tmpfile).text)['id'] # Upload file to Mastodon
media_ids.append(media_id) # Save uploaded media IDs
2022-08-28 16:53:36 +03:00
log.info('Remove local file: %s' % tmpfile)
2022-06-07 18:17:58 +03:00
os.remove(tmpfile) # Remove local file
i += 1
2022-05-21 02:27:14 +03:00
# Build attachments list
post_attachments = ''
for attachment in attachments:
key = str(list(attachment.keys())[0])
# Example resulting string: 'Album: https://vk.com/album-26788782_284176934'
post_attachments = post_attachments + key.title() + ': ' + attachment[key] + '\n'
2022-06-07 18:17:58 +03:00
# Get status text
text = post_text + '\n\n' + 'Source: ' + vk_post_url + '\n\nAttachments:\n' + post_attachments
# Post toot!
publish_toot(text, media_ids)
2022-05-21 02:27:14 +03:00
def touch_lock_file(file: str, post_id: int):
with open(file, 'w') as lock:
2022-07-07 09:01:14 +03:00
data = json.dumps({'post_id': str(post_id)})
lock.write(data)
2022-05-21 02:27:14 +03:00
def read_lock_file(file: str):
with open(file, 'r') as lock:
2022-07-07 09:01:14 +03:00
data = json.loads(lock.read())
return data['post_id']
2022-05-21 02:27:14 +03:00
def poll():
2022-08-28 16:53:36 +03:00
log.info('Start polling %s' % datetime.datetime.now().isoformat())
2022-07-07 09:01:14 +03:00
lock_file = './data/post_id.json'
2022-05-21 02:27:14 +03:00
if os.path.exists(lock_file):
prev_post_id = read_lock_file(lock_file)
2022-08-28 16:53:36 +03:00
log.info('Read last post ID from file: %s' % lock_file)
log.info('Last post ID: %s' % prev_post_id)
2022-05-21 02:27:14 +03:00
else:
prev_post_id = 0
2022-08-28 16:53:36 +03:00
log.info('Last post ID: 0')
2022-05-21 02:27:14 +03:00
while True:
post_data = get_vk_group_last_post() # raw data
post_id = post_data['response']['items'][0]['id']
# Don't post duplicates
2022-07-07 09:01:14 +03:00
if int(post_id) == int(prev_post_id):
2022-08-28 16:53:36 +03:00
log.info('Post with VK ID %s already posted, skipping' % post_id)
2022-05-21 02:27:14 +03:00
else:
# Toot!
2022-08-28 16:53:36 +03:00
log.info('Toot! VK post ID: {}, URL: {}'.format(
post_id, get_vk_post_url(post_data)))
toot(post_data)
2022-05-21 02:27:14 +03:00
touch_lock_file(lock_file, post_id)
prev_post_id = read_lock_file(lock_file)
try:
poll_time = int(config['bridge']['POLLING_TIME'])
2022-05-21 02:27:14 +03:00
except (KeyError, TypeError):
poll_time = 300
time.sleep(poll_time)
if __name__ == '__main__':
2022-08-28 16:53:36 +03:00
log.info('VK-Toot started. [bridge]: {}; [mastodon].API_URL {}'.format(
dict(config['bridge']),
config['mastodon']['API_URL'],
))
# Start polling
2022-05-21 02:27:14 +03:00
poll()