215 lines
7.5 KiB
Python
215 lines
7.5 KiB
Python
__version__ = '0.2.1'
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import time
|
|
import datetime
|
|
import logging
|
|
import shutil
|
|
import urllib.parse
|
|
|
|
import requests
|
|
|
|
|
|
MASTODON_API_URL = os.environ['MASTODON_API_URL']
|
|
MASTODON_API_ACCESS_TOKEN = os.environ['MASTODON_API_ACCESS_TOKEN']
|
|
VK_API_URL = os.environ['VK_API_URL']
|
|
VK_API_VERSION = os.environ['VK_API_VERSION']
|
|
VK_API_ACCESS_TOKEN = os.environ['VK_API_ACCESS_TOKEN']
|
|
VK_GROUP_DOMAIN = os.environ['VK_GROUP_DOMAIN']
|
|
|
|
# Set up logger
|
|
logger = logging.getLogger('vk_mastodon_bridge')
|
|
logger.setLevel(logging.INFO)
|
|
handler = logging.StreamHandler(stream=sys.stdout)
|
|
handler.setFormatter(logging.Formatter(fmt = '[%(asctime)s: %(levelname)s] %(message)s'))
|
|
logger.addHandler(handler)
|
|
|
|
def get_vk_group_last_post():
|
|
"""Return dict with VK group last post data."""
|
|
return json.loads(requests.get(VK_API_URL + '/wall.get' \
|
|
+ '?v=' + VK_API_VERSION \
|
|
+ '&access_token=' + VK_API_ACCESS_TOKEN \
|
|
+ '&domain=' + VK_GROUP_DOMAIN \
|
|
+ '&count=1').text)
|
|
|
|
def get_vk_post_text(post_data: dict) -> str:
|
|
"""See: https://dev.vk.com/reference/objects/post"""
|
|
return post_data['response']['items'][0]['text']
|
|
|
|
def get_vk_post_url(post_data: dict) -> str:
|
|
"""Return link to original post on vk.com
|
|
See: https://dev.vk.com/reference/objects/post
|
|
"""
|
|
wall_id = str(post_data['response']['items'][0]['owner_id'])
|
|
post_id = str(post_data['response']['items'][0]['id'])
|
|
return 'https://vk.com/wall' + wall_id + '_' + post_id
|
|
|
|
def get_vk_post_attachments(post_data: dict) -> list:
|
|
"""Process attachments. See attachments at
|
|
https://dev.vk.com/method/wall.post
|
|
Return list of dicts with following structure::
|
|
|
|
[{'photo': 'url'}, {'album': 'url'}]
|
|
"""
|
|
attachments = []
|
|
raw_attachments = post_data['response']['items'][0]['attachments']
|
|
|
|
for attachment in raw_attachments:
|
|
if attachment['type'] == 'photo':
|
|
# Get photo in max size by height (photos are proportionally resized)
|
|
height = [ photo['height'] for photo in attachment['photo']['sizes'] ]
|
|
for photo in attachment['photo']['sizes']:
|
|
if photo['height'] == max(height):
|
|
photo_url = photo['url']
|
|
attachments.append({'photo': photo_url})
|
|
elif attachment['type'] == 'video':
|
|
pass
|
|
elif attachment['type'] == 'audio':
|
|
pass
|
|
elif attachment['type'] == 'doc':
|
|
pass
|
|
elif attachment['type'] == 'page':
|
|
pass
|
|
elif attachment['type'] == 'note':
|
|
pass
|
|
elif attachment['type'] == 'pull':
|
|
pass
|
|
elif attachment['type'] == 'album':
|
|
owner_id = str(attachment['album']['owner_id'])
|
|
id = str(attachment['album']['id'])
|
|
album_url = 'https://vk.com/album' + owner_id + '_' + id
|
|
attachments.append({'album': album_url})
|
|
elif attachment['type'] == 'market':
|
|
pass
|
|
elif attachment['type'] == 'market_album':
|
|
pass
|
|
elif attachment['type'] == 'audio_playlist':
|
|
pass
|
|
return attachments
|
|
|
|
def download_file(url: str) -> str:
|
|
"""Save file to /tmp. Return file path"""
|
|
filename = '/tmp/' + os.path.basename(urllib.parse.urlparse(url).path)
|
|
response = requests.get(url, stream=True)
|
|
with open(filename, 'wb') as out_file:
|
|
shutil.copyfileobj(response.raw, out_file)
|
|
del response
|
|
return filename
|
|
|
|
def post_media(file: str) -> str:
|
|
"""Upload media file to Mastodon."""
|
|
headers = {'Authorization': 'Bearer ' + MASTODON_API_ACCESS_TOKEN}
|
|
files = {'file': open(file,'rb')}
|
|
response = requests.post(MASTODON_API_URL + '/media', files=files, headers=headers)
|
|
logger.info('Post media on Mastodon. Response: ' \
|
|
+ str(response.status_code) + ' ' + str(response.text))
|
|
# Sleep some seconds to prevent HTTP 429 response.
|
|
try:
|
|
request_delay = int(os.environ['REQUEST_DELAY'])
|
|
except (KeyError, TypeError):
|
|
request_delay = 1
|
|
time.sleep(request_delay)
|
|
return response
|
|
|
|
def publish_toot(post_text: str, media_ids: list):
|
|
"""Post toot on Mastodon.
|
|
See: https://docs.joinmastodon.org/methods/statuses/
|
|
"""
|
|
headers = {'Authorization': 'Bearer ' + MASTODON_API_ACCESS_TOKEN}
|
|
params = {'status': post_text, 'media_ids[]': media_ids}
|
|
response = requests.post(MASTODON_API_URL + '/statuses', data=params, headers=headers)
|
|
post_url = json.loads(response.text)['url']
|
|
logger.info('Publish status. Response: ' \
|
|
+ str(response.status_code) + ' Status: ' + str(post_url))
|
|
return response
|
|
|
|
def post_toot(post_data: dict) -> str:
|
|
"""Upload media files, generate status text for Mastodon post and publish.
|
|
"""
|
|
post_text = get_vk_post_text(post_data)
|
|
vk_post_url = get_vk_post_url(post_data)
|
|
attachments = get_vk_post_attachments(post_data)
|
|
|
|
# Upload attachments!
|
|
# Upload only first 4 photos and get media_ids
|
|
i = 0
|
|
media_ids = []
|
|
logger.info('Attachments: %s' % str(attachments))
|
|
|
|
attachments_count = len(attachments)
|
|
if attachments_count > 4:
|
|
attachments_count = 4
|
|
|
|
while i < attachments_count:
|
|
if list(attachments[i].keys())[0] == 'photo':
|
|
photo_url = attachments[i]['photo']
|
|
logger.info('Download image: %s' % photo_url)
|
|
tmpfile = download_file(photo_url) # Download file from VK
|
|
logger.info('Image saved locally as: %s' % tmpfile)
|
|
media_id = json.loads(post_media(tmpfile).text)['id'] # Upload file to Mastodon
|
|
media_ids.append(media_id) # Save uploaded media IDs
|
|
logger.info('Remove local file: %s' % tmpfile)
|
|
os.remove(tmpfile) # Remove local file
|
|
i += 1
|
|
|
|
# Build attachments list
|
|
post_attachments = ''
|
|
for attachment in attachments:
|
|
key = str(list(attachment.keys())[0])
|
|
# Example resulting string: 'Album: https://vk.com/album-26788782_284176934'
|
|
post_attachments = post_attachments + key.title() + ': ' + attachment[key] + '\n'
|
|
|
|
# Get status text
|
|
text = post_text + '\n\n' + 'Source: ' + vk_post_url + '\n\nAttachments:\n' + post_attachments
|
|
|
|
# Post toot!
|
|
publish_toot(text, media_ids)
|
|
|
|
def touch_lock_file(file: str, post_id: int):
|
|
with open(file, 'w') as lock:
|
|
data = json.dumps({'post_id': str(post_id)})
|
|
lock.write(data)
|
|
|
|
def read_lock_file(file: str):
|
|
with open(file, 'r') as lock:
|
|
data = json.loads(lock.read())
|
|
return data['post_id']
|
|
|
|
def poll():
|
|
logger.info('Start polling %s' % datetime.datetime.now().isoformat())
|
|
lock_file = './data/post_id.json'
|
|
if os.path.exists(lock_file):
|
|
prev_post_id = read_lock_file(lock_file)
|
|
logger.info('Read last post ID from file: %s' % lock_file)
|
|
logger.info('Last post ID: %s' % prev_post_id)
|
|
else:
|
|
prev_post_id = 0
|
|
logger.info('Last post ID: 0')
|
|
|
|
while True:
|
|
post_data = get_vk_group_last_post() # raw data
|
|
post_id = post_data['response']['items'][0]['id']
|
|
|
|
# Don't post duplicates
|
|
if int(post_id) == int(prev_post_id):
|
|
logger.info('Post with VK ID %s already posted, skipping' % post_id)
|
|
else:
|
|
# Toot!
|
|
logger.info('Toot! VK post ID: %s' % post_id)
|
|
post_toot(post_data)
|
|
|
|
touch_lock_file(lock_file, post_id)
|
|
prev_post_id = read_lock_file(lock_file)
|
|
try:
|
|
poll_time = int(os.environ['POLLING_TIME'])
|
|
except (KeyError, TypeError):
|
|
poll_time = 300
|
|
|
|
time.sleep(poll_time)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
poll()
|