143 lines
4.8 KiB
Python
143 lines
4.8 KiB
Python
|
__version__ = '0.1.0'
|
||
|
|
||
|
import os
|
||
|
import json
|
||
|
import time
|
||
|
import requests
|
||
|
|
||
|
|
||
|
MASTODON_API_URL = os.environ['MASTODON_API_URL']
|
||
|
MASTODON_API_ACCESS_TOKEN = os.environ['MASTODON_API_ACCESS_TOKEN']
|
||
|
VK_API_URL = os.environ['VK_API_URL']
|
||
|
VK_API_VERSION = os.environ['VK_API_VERSION']
|
||
|
VK_API_ACCESS_TOKEN = os.environ['VK_API_ACCESS_TOKEN']
|
||
|
VK_GROUP_DOMAIN = os.environ['VK_GROUP_DOMAIN']
|
||
|
|
||
|
def get_vk_group_last_post():
|
||
|
"""Return dict with VK group last post data."""
|
||
|
return json.loads(requests.get(VK_API_URL + '/wall.get' \
|
||
|
+ '?v=' + VK_API_VERSION \
|
||
|
+ '&access_token=' + VK_API_ACCESS_TOKEN \
|
||
|
+ '&domain=' + VK_GROUP_DOMAIN \
|
||
|
+ '&count=1').text)
|
||
|
|
||
|
def get_vk_post_text(post_data: dict) -> str:
|
||
|
"""See: https://dev.vk.com/reference/objects/post"""
|
||
|
return post_data['response']['items'][0]['text']
|
||
|
|
||
|
def get_vk_post_url(post_data: dict) -> str:
|
||
|
"""Return link to original post on vk.com
|
||
|
See: https://dev.vk.com/reference/objects/post
|
||
|
"""
|
||
|
wall_id = str(post_data['response']['items'][0]['owner_id'])
|
||
|
post_id = str(post_data['response']['items'][0]['id'])
|
||
|
return 'https://vk.com/wall' + wall_id + '_' + post_id
|
||
|
|
||
|
def get_vk_post_attachments(post_data: dict) -> list:
|
||
|
"""Process attachments. See attachments at
|
||
|
https://dev.vk.com/method/wall.post
|
||
|
Return list with following structure::
|
||
|
|
||
|
[{'photo': 'url'}, {'album': 'url'}]
|
||
|
"""
|
||
|
attachments = []
|
||
|
raw_attachments = post_data['response']['items'][0]['attachments']
|
||
|
|
||
|
for attachment in raw_attachments:
|
||
|
if attachment['type'] == 'photo':
|
||
|
# Get photo in max size by height (photos are proportionally resized)
|
||
|
height = [ photo['height'] for photo in attachment['photo']['sizes'] ]
|
||
|
for photo in attachment['photo']['sizes']:
|
||
|
if photo['height'] == max(height):
|
||
|
photo_url = photo['url']
|
||
|
attachments.append({'photo': photo_url})
|
||
|
elif attachment['type'] == 'video':
|
||
|
pass
|
||
|
elif attachment['type'] == 'audio':
|
||
|
pass
|
||
|
elif attachment['type'] == 'doc':
|
||
|
pass
|
||
|
elif attachment['type'] == 'page':
|
||
|
pass
|
||
|
elif attachment['type'] == 'note':
|
||
|
pass
|
||
|
elif attachment['type'] == 'pull':
|
||
|
pass
|
||
|
elif attachment['type'] == 'album':
|
||
|
owner_id = str(attachment['album']['owner_id'])
|
||
|
id = str(attachment['album']['id'])
|
||
|
album_url = 'https://vk.com/album' + owner_id + '_' + id
|
||
|
attachments.append({'album': album_url})
|
||
|
elif attachment['type'] == 'market':
|
||
|
pass
|
||
|
elif attachment['type'] == 'market_album':
|
||
|
pass
|
||
|
elif attachment['type'] == 'audio_playlist':
|
||
|
pass
|
||
|
return attachments
|
||
|
|
||
|
def build_post(post_data: dict) -> str:
|
||
|
post_text = get_vk_post_text(post_data)
|
||
|
vk_post_url = get_vk_post_url(post_data)
|
||
|
attachments = get_vk_post_attachments(post_data)
|
||
|
|
||
|
# Build attachments list
|
||
|
post_attachments = ''
|
||
|
for attachment in attachments:
|
||
|
key = str(list(attachment.keys())[0])
|
||
|
# Example resulting string: 'Album: https://vk.com/album-26788782_284176934'
|
||
|
post_attachments = post_attachments + key.title() + ': ' + attachment[key] + '\n'
|
||
|
return post_text + '\n\n' + 'Source: ' + vk_post_url + '\n\nAttachments:\n' + post_attachments
|
||
|
|
||
|
def post_toot(post_text: str):
|
||
|
"""Post toot on Mastodon.
|
||
|
See: https://docs.joinmastodon.org/methods/statuses/
|
||
|
"""
|
||
|
auth = {'Authorization': 'Bearer ' + MASTODON_API_ACCESS_TOKEN}
|
||
|
params = {'status': post_text}
|
||
|
response = requests.post(MASTODON_API_URL + '/statuses', data=params, headers=auth)
|
||
|
return response
|
||
|
|
||
|
def touch_lock_file(file: str, post_id: int):
|
||
|
with open(file, 'w') as lock:
|
||
|
lock.write(str(post_id))
|
||
|
|
||
|
def read_lock_file(file: str):
|
||
|
with open(file, 'r') as lock:
|
||
|
return int(lock.read())
|
||
|
|
||
|
def poll():
|
||
|
lock_file = './.last_post_id.tmp'
|
||
|
if os.path.exists(lock_file):
|
||
|
prev_post_id = read_lock_file(lock_file)
|
||
|
print('Read last post ID from file:', lock_file)
|
||
|
else:
|
||
|
prev_post_id = 0
|
||
|
print('Last post ID is 0')
|
||
|
|
||
|
while True:
|
||
|
post_data = get_vk_group_last_post() # raw data
|
||
|
post_id = post_data['response']['items'][0]['id']
|
||
|
|
||
|
# Don't post duplicates
|
||
|
if post_id == prev_post_id:
|
||
|
print('Skip posting')
|
||
|
else:
|
||
|
# Toot!
|
||
|
print('Toot! VK post ID:', post_id)
|
||
|
post = build_post(post_data)
|
||
|
post_toot(post)
|
||
|
|
||
|
touch_lock_file(lock_file, post_id)
|
||
|
prev_post_id = read_lock_file(lock_file)
|
||
|
try:
|
||
|
poll_time = int(os.environ['POLLING_TIME'])
|
||
|
except (KeyError, TypeError):
|
||
|
poll_time = 300
|
||
|
|
||
|
time.sleep(poll_time)
|
||
|
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
poll()
|