__version__ = '0.2.0' import os import json import time import shutil from urllib.parse import urlparse import requests MASTODON_API_URL = os.environ['MASTODON_API_URL'] MASTODON_API_ACCESS_TOKEN = os.environ['MASTODON_API_ACCESS_TOKEN'] VK_API_URL = os.environ['VK_API_URL'] VK_API_VERSION = os.environ['VK_API_VERSION'] VK_API_ACCESS_TOKEN = os.environ['VK_API_ACCESS_TOKEN'] VK_GROUP_DOMAIN = os.environ['VK_GROUP_DOMAIN'] def get_vk_group_last_post(): """Return dict with VK group last post data.""" return json.loads(requests.get(VK_API_URL + '/wall.get' \ + '?v=' + VK_API_VERSION \ + '&access_token=' + VK_API_ACCESS_TOKEN \ + '&domain=' + VK_GROUP_DOMAIN \ + '&count=1').text) def get_vk_post_text(post_data: dict) -> str: """See: https://dev.vk.com/reference/objects/post""" return post_data['response']['items'][0]['text'] def get_vk_post_url(post_data: dict) -> str: """Return link to original post on vk.com See: https://dev.vk.com/reference/objects/post """ wall_id = str(post_data['response']['items'][0]['owner_id']) post_id = str(post_data['response']['items'][0]['id']) return 'https://vk.com/wall' + wall_id + '_' + post_id def get_vk_post_attachments(post_data: dict) -> list: """Process attachments. See attachments at https://dev.vk.com/method/wall.post Return list of dicts with following structure:: [{'photo': 'url'}, {'album': 'url'}] """ attachments = [] raw_attachments = post_data['response']['items'][0]['attachments'] for attachment in raw_attachments: if attachment['type'] == 'photo': # Get photo in max size by height (photos are proportionally resized) height = [ photo['height'] for photo in attachment['photo']['sizes'] ] for photo in attachment['photo']['sizes']: if photo['height'] == max(height): photo_url = photo['url'] attachments.append({'photo': photo_url}) elif attachment['type'] == 'video': pass elif attachment['type'] == 'audio': pass elif attachment['type'] == 'doc': pass elif attachment['type'] == 'page': pass elif attachment['type'] == 'note': pass elif attachment['type'] == 'pull': pass elif attachment['type'] == 'album': owner_id = str(attachment['album']['owner_id']) id = str(attachment['album']['id']) album_url = 'https://vk.com/album' + owner_id + '_' + id attachments.append({'album': album_url}) elif attachment['type'] == 'market': pass elif attachment['type'] == 'market_album': pass elif attachment['type'] == 'audio_playlist': pass return attachments def download_file(url: str) -> str: """Save file to /tmp. Return file path""" filename = '/tmp/' + os.path.basename(urlparse(url).path) response = requests.get(url, stream=True) with open(filename, 'wb') as out_file: shutil.copyfileobj(response.raw, out_file) del response return filename def post_media(file: str) -> str: """Upload media file to Mastodon.""" headers = {'Authorization': 'Bearer ' + MASTODON_API_ACCESS_TOKEN} files = {'file': open(file,'rb')} response = requests.post(MASTODON_API_URL + '/media', files=files, headers=headers) return response def publish_toot(post_text: str, media_ids: list): """Post toot on Mastodon. See: https://docs.joinmastodon.org/methods/statuses/ """ headers = {'Authorization': 'Bearer ' + MASTODON_API_ACCESS_TOKEN} params = {'status': post_text, 'media_ids[]': media_ids} response = requests.post(MASTODON_API_URL + '/statuses', data=params, headers=headers) return response def post_toot(post_data: dict) -> str: """Upload media files, generate status text for Mastodon post and publish. """ post_text = get_vk_post_text(post_data) vk_post_url = get_vk_post_url(post_data) attachments = get_vk_post_attachments(post_data) # Upload attachments! # Upload only first 4 photos and get media_ids i = 0 media_ids = [] while i < 4: if list(attachments[i].keys())[0] == 'photo': photo_url = attachments[i]['photo'] print('Download image:', photo_url) # Log tmpfile = download_file(photo_url) # Download file from VK print('Saved as:', tmpfile) # Log print('Upload to Mastodon:', tmpfile) # Log media_id = json.loads(post_media(tmpfile).text)['id'] # Upload file to Mastodon print('Uploaded. Media ID:', media_id) # Log media_ids.append(media_id) # Save uploaded media IDs print('Remove local file:', tmpfile) # Log os.remove(tmpfile) # Remove local file i += 1 # Build attachments list post_attachments = '' for attachment in attachments: key = str(list(attachment.keys())[0]) # Example resulting string: 'Album: https://vk.com/album-26788782_284176934' post_attachments = post_attachments + key.title() + ': ' + attachment[key] + '\n' # Get status text text = post_text + '\n\n' + 'Source: ' + vk_post_url + '\n\nAttachments:\n' + post_attachments # Post toot! publish_toot(text, media_ids) def touch_lock_file(file: str, post_id: int): with open(file, 'w') as lock: lock.write(str(post_id)) def read_lock_file(file: str): with open(file, 'r') as lock: return int(lock.read()) def poll(): lock_file = './data/last_post_id' if os.path.exists(lock_file): prev_post_id = read_lock_file(lock_file) print('Read last post ID from file:', lock_file) else: prev_post_id = 0 print('Last post ID is 0') while True: post_data = get_vk_group_last_post() # raw data post_id = post_data['response']['items'][0]['id'] # Don't post duplicates if post_id == prev_post_id: print('Skip posting') # Log else: # Toot! print('===> Toot! VK post ID:', post_id) # Log post_toot(post_data) touch_lock_file(lock_file, post_id) prev_post_id = read_lock_file(lock_file) try: poll_time = int(os.environ['POLLING_TIME']) except (KeyError, TypeError): poll_time = 300 time.sleep(poll_time) if __name__ == '__main__': poll()