import logging
import uuid
import re
-import urllib.parse
import simplejson as json
import cgi
+import datetime
+
+from urllib.parse import urlsplit
from Crypto.PublicKey import RSA
+from cachetools import LFUCache
+
+from . import app, CONFIG
from .database import DATABASE
from .http_debug import http_debug
-
-from cachetools import LFUCache
+from .remote_actor import fetch_actor
+from .http_signatures import sign_headers, generate_body_digest
# generate actor keys if not present
pubkey = privkey.publickey()
DATABASE["actorKeys"] = {
- "publicKey": pubkey.exportKey('PEM'),
- "privateKey": privkey.exportKey('PEM')
+ "publicKey": pubkey.exportKey('PEM').decode('utf-8'),
+ "privateKey": privkey.exportKey('PEM').decode('utf-8')
}
PRIVKEY = RSA.importKey(DATABASE["actorKeys"]["privateKey"])
PUBKEY = PRIVKEY.publickey()
-
-
-from . import app, CONFIG
-from .remote_actor import fetch_actor
-
-
-AP_CONFIG = CONFIG.get('ap', {'host': 'localhost','blocked_instances':[]})
+AP_CONFIG = CONFIG['ap']
CACHE_SIZE = CONFIG.get('cache-size', 16384)
-
-
CACHE = LFUCache(CACHE_SIZE)
+sem = asyncio.Semaphore(500)
+
async def actor(request):
data = {
"preferredUsername": "relay",
"url": "https://{}/actor".format(request.host)
}
- return aiohttp.web.json_response(data)
+ return aiohttp.web.json_response(data, content_type='application/activity+json')
app.router.add_get('/actor', actor)
-
-
-from .http_signatures import sign_headers
-
-
get_actor_inbox = lambda actor: actor.get('endpoints', {}).get('sharedInbox', actor['inbox'])
async def push_message_to_actor(actor, message, our_key_id):
inbox = get_actor_inbox(actor)
-
- url = urllib.parse.urlsplit(inbox)
+ url = urlsplit(inbox)
# XXX: Digest
data = json.dumps(message)
'(request-target)': 'post {}'.format(url.path),
'Content-Length': str(len(data)),
'Content-Type': 'application/activity+json',
- 'User-Agent': 'ActivityRelay'
+ 'User-Agent': 'ActivityRelay',
+ 'Host': url.netloc,
+ 'Digest': 'SHA-256={}'.format(generate_body_digest(data)),
+ 'Date': datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
}
headers['signature'] = sign_headers(headers, PRIVKEY, our_key_id)
headers.pop('(request-target)')
+ headers.pop('Host')
logging.debug('%r >> %r', inbox, message)
- try:
- async with aiohttp.ClientSession(trace_configs=[http_debug()]) as session:
- async with session.post(inbox, data=data, headers=headers) as resp:
- if resp.status == 202:
- return
- resp_payload = await resp.text()
- logging.debug('%r >> resp %r', inbox, resp_payload)
- except Exception as e:
- logging.info('Caught %r while pushing to %r.', e, inbox)
+ global sem
+ async with sem:
+ try:
+ async with aiohttp.ClientSession(trace_configs=[http_debug()]) as session:
+ async with session.post(inbox, data=data, headers=headers) as resp:
+ if resp.status == 202:
+ return
+ resp_payload = await resp.text()
+ logging.debug('%r >> resp %r', inbox, resp_payload)
+ except Exception as e:
+ logging.info('Caught %r while pushing to %r.', e, inbox)
-async def follow_remote_actor(actor_uri):
- logging.info('following: %r', actor_uri)
+async def fetch_nodeinfo(domain):
+ headers = {'Accept': 'application/activity+json'}
+ nodeinfo_url = None
+
+ wk_nodeinfo = await fetch_actor(f'https://{domain}/.well-known/nodeinfo', headers=headers)
+
+ if not wk_nodeinfo:
+ return
+
+ for link in wk_nodeinfo.get('links', ''):
+ if link['rel'] == 'http://nodeinfo.diaspora.software/ns/schema/2.0':
+ nodeinfo_url = link['href']
+ break
+
+ if not nodeinfo_url:
+ return
+
+ nodeinfo_data = await fetch_actor(nodeinfo_url, headers=headers)
+ software = nodeinfo_data.get('software')
+
+ return software.get('name') if software else None
+
+async def follow_remote_actor(actor_uri):
actor = await fetch_actor(actor_uri)
+
+ if not actor:
+ logging.info('failed to fetch actor at: %r', actor_uri)
+ return
+
+ if AP_CONFIG['whitelist_enabled'] is True and urlsplit(actor_uri).hostname not in AP_CONFIG['whitelist']:
+ logging.info('refusing to follow non-whitelisted actor: %r', actor_uri)
+ return
+
+ logging.info('following: %r', actor_uri)
message = {
"@context": "https://www.w3.org/ns/activitystreams",
async def unfollow_remote_actor(actor_uri):
- logging.info('unfollowing: %r', actor_uri)
-
actor = await fetch_actor(actor_uri)
+ if not actor:
+ logging.info('failed to fetch actor at: %r', actor_uri)
+ return
+
+ logging.info('unfollowing: %r', actor_uri)
message = {
"@context": "https://www.w3.org/ns/activitystreams",
return cgi.escape(no_tags)
-def distill_inboxes(actor):
+def distill_inboxes(actor, object_id):
global DATABASE
+ origin_hostname = urlsplit(object_id).hostname
+
inbox = get_actor_inbox(actor)
targets = [target for target in DATABASE.get('relay-list', []) if target != inbox]
+ targets = [target for target in targets if urlsplit(target).hostname != origin_hostname]
+ hostnames = [urlsplit(target).hostname for target in targets]
assert inbox not in targets
+ assert origin_hostname not in hostnames
return targets
logging.debug('>> already relayed %r as %r', object_id, CACHE[object_id])
return
- # don't relay mastodon announces -- causes LRP fake direction issues
- if data['type'] == 'Announce' and len(data.get('cc', [])) > 0:
- return
-
activity_id = "https://{}/activities/{}".format(request.host, uuid.uuid4())
message = {
"@context": "https://www.w3.org/ns/activitystreams",
"type": "Announce",
- "to": ["https://{}/actor/followers".format(request.host)],
+ "to": ["https://{}/followers".format(request.host)],
"actor": "https://{}/actor".format(request.host),
"object": object_id,
"id": activity_id
logging.debug('>> relay: %r', message)
- inboxes = distill_inboxes(actor)
+ inboxes = distill_inboxes(actor, object_id)
futures = [push_message_to_actor({'inbox': inbox}, message, 'https://{}/actor#main-key'.format(request.host)) for inbox in inboxes]
asyncio.ensure_future(asyncio.gather(*futures))
CACHE[object_id] = activity_id
+async def handle_forward(actor, data, request):
+ object_id = distill_object_id(data)
+
+ logging.debug('>> Relay %r', data)
+
+ inboxes = distill_inboxes(actor, object_id)
+
+ futures = [
+ push_message_to_actor(
+ {'inbox': inbox},
+ data,
+ 'https://{}/actor#main-key'.format(request.host))
+ for inbox in inboxes]
+ asyncio.ensure_future(asyncio.gather(*futures))
+
+
async def handle_follow(actor, data, request):
global DATABASE
following = DATABASE.get('relay-list', [])
inbox = get_actor_inbox(actor)
- if urllib.parse.urlsplit(inbox).hostname in AP_CONFIG['blocked_instances']:
+
+ if urlsplit(inbox).hostname in AP_CONFIG['blocked_instances']:
return
if inbox not in following:
following += [inbox]
DATABASE['relay-list'] = following
- if data['object'].endswith('/actor'):
- asyncio.ensure_future(follow_remote_actor(actor['id']))
+ asyncio.ensure_future(follow_remote_actor(actor['id']))
message = {
"@context": "https://www.w3.org/ns/activitystreams",
# this is wrong per litepub, but mastodon < 2.4 is not compliant with that profile.
"object": {
- "type": "Follow",
- "id": data["id"],
- "object": "https://{}/actor".format(request.host),
- "actor": actor["id"]
+ "type": "Follow",
+ "id": data["id"],
+ "object": "https://{}/actor".format(request.host),
+ "actor": actor["id"]
},
"id": "https://{}/activities/{}".format(request.host, uuid.uuid4()),
following.remove(inbox)
DATABASE['relay-list'] = following
- if child['object'].endswith('/actor'):
- await unfollow_remote_actor(actor['id'])
+ await unfollow_remote_actor(actor['id'])
processors = {
'Announce': handle_relay,
'Create': handle_relay,
+ 'Delete': handle_forward,
'Follow': handle_follow,
- 'Undo': handle_undo
+ 'Undo': handle_undo,
+ 'Update': handle_forward,
}
async def inbox(request):
data = await request.json()
+ instance = urlsplit(data['actor']).hostname
+
+ if AP_CONFIG['blocked_software']:
+ software = await fetch_nodeinfo(instance)
+
+ if software and software.lower() in AP_CONFIG['blocked_software']:
+ raise aiohttp.web.HTTPUnauthorized(body='relays have been blocked', content_type='text/plain')
if 'actor' not in data or not request['validated']:
raise aiohttp.web.HTTPUnauthorized(body='access denied', content_type='text/plain')
+ elif data['type'] != 'Follow' and 'https://{}/inbox'.format(instance) not in DATABASE['relay-list']:
+ raise aiohttp.web.HTTPUnauthorized(body='access denied', content_type='text/plain')
+
+ elif AP_CONFIG['whitelist_enabled'] is True and instance not in AP_CONFIG['whitelist']:
+ raise aiohttp.web.HTTPUnauthorized(body='access denied', content_type='text/plain')
+
actor = await fetch_actor(data["actor"])
actor_uri = 'https://{}/actor'.format(request.host)
return aiohttp.web.Response(body=b'{}', content_type='application/activity+json')
-
app.router.add_post('/inbox', inbox)