Apply 1 suggestion(s) to 1 file(s)
[relay.git/.git] / relay / actor.py
index 1aebe7f..b45ee6f 100644 (file)
@@ -4,14 +4,19 @@ import asyncio
 import logging
 import uuid
 import re
-import urllib.parse
 import simplejson as json
 import cgi
+import datetime
+
+from urllib.parse import urlsplit
 from Crypto.PublicKey import RSA
+from cachetools import LFUCache
+
+from . import app, CONFIG
 from .database import DATABASE
 from .http_debug import http_debug
-
-from cachetools import LFUCache
+from .remote_actor import fetch_actor
+from .http_signatures import sign_headers, generate_body_digest
 
 
 # generate actor keys if not present
@@ -22,25 +27,19 @@ if "actorKeys" not in DATABASE:
     pubkey = privkey.publickey()
 
     DATABASE["actorKeys"] = {
-        "publicKey": pubkey.exportKey('PEM'),
-        "privateKey": privkey.exportKey('PEM')
+        "publicKey": pubkey.exportKey('PEM').decode('utf-8'),
+        "privateKey": privkey.exportKey('PEM').decode('utf-8')
     }
 
 
 PRIVKEY = RSA.importKey(DATABASE["actorKeys"]["privateKey"])
 PUBKEY = PRIVKEY.publickey()
-
-
-from . import app, CONFIG
-from .remote_actor import fetch_actor
-
-
-AP_CONFIG = CONFIG.get('ap', {'host': 'localhost','blocked_instances':[]})
+AP_CONFIG = CONFIG['ap']
 CACHE_SIZE = CONFIG.get('cache-size', 16384)
-
-
 CACHE = LFUCache(CACHE_SIZE)
 
+sem = asyncio.Semaphore(500)
+
 
 async def actor(request):
     data = {
@@ -63,22 +62,16 @@ async def actor(request):
         "preferredUsername": "relay",
         "url": "https://{}/actor".format(request.host)
     }
-    return aiohttp.web.json_response(data)
+    return aiohttp.web.json_response(data, content_type='application/activity+json')
 
 
 app.router.add_get('/actor', actor)
-
-
-from .http_signatures import sign_headers
-
-
 get_actor_inbox = lambda actor: actor.get('endpoints', {}).get('sharedInbox', actor['inbox'])
 
 
 async def push_message_to_actor(actor, message, our_key_id):
     inbox = get_actor_inbox(actor)
-
-    url = urllib.parse.urlsplit(inbox)
+    url = urlsplit(inbox)
 
     # XXX: Digest
     data = json.dumps(message)
@@ -86,28 +79,65 @@ async def push_message_to_actor(actor, message, our_key_id):
         '(request-target)': 'post {}'.format(url.path),
         'Content-Length': str(len(data)),
         'Content-Type': 'application/activity+json',
-        'User-Agent': 'ActivityRelay'
+        'User-Agent': 'ActivityRelay',
+        'Host': url.netloc,
+        'Digest': 'SHA-256={}'.format(generate_body_digest(data)),
+        'Date': datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
     }
     headers['signature'] = sign_headers(headers, PRIVKEY, our_key_id)
     headers.pop('(request-target)')
+    headers.pop('Host')
 
     logging.debug('%r >> %r', inbox, message)
 
-    try:
-        async with aiohttp.ClientSession(trace_configs=[http_debug()]) as session:
-            async with session.post(inbox, data=data, headers=headers) as resp:
-                if resp.status == 202:
-                    return
-                resp_payload = await resp.text()
-                logging.debug('%r >> resp %r', inbox, resp_payload)
-    except Exception as e:
-        logging.info('Caught %r while pushing to %r.', e, inbox)
+    global sem
+    async with sem:
+        try:
+            async with aiohttp.ClientSession(trace_configs=[http_debug()]) as session:
+                async with session.post(inbox, data=data, headers=headers) as resp:
+                    if resp.status == 202:
+                        return
+                    resp_payload = await resp.text()
+                    logging.debug('%r >> resp %r', inbox, resp_payload)
+        except Exception as e:
+            logging.info('Caught %r while pushing to %r.', e, inbox)
 
 
-async def follow_remote_actor(actor_uri):
-    logging.info('following: %r', actor_uri)
+async def fetch_nodeinfo(domain):
+    headers = {'Accept': 'application/activity+json'}
+    nodeinfo_url = None
+
+    wk_nodeinfo = await fetch_actor(f'https://{domain}/.well-known/nodeinfo', headers=headers)
+
+    if not wk_nodeinfo:
+        return
+
+    for link in wk_nodeinfo.get('links', ''):
+        if link['rel'] == 'http://nodeinfo.diaspora.software/ns/schema/2.0':
+            nodeinfo_url = link['href']
+            break
+
+    if not nodeinfo_url:
+        return
+
+    nodeinfo_data = await fetch_actor(nodeinfo_url, headers=headers)
+    software = nodeinfo_data.get('software')
+
+    return software.get('name') if software else None
 
+
+async def follow_remote_actor(actor_uri):
     actor = await fetch_actor(actor_uri)
+    
+    if not actor:
+        logging.info('failed to fetch actor at: %r', actor_uri)
+        return
+
+    if AP_CONFIG['whitelist_enabled'] is True and urlsplit(actor_uri).hostname not in AP_CONFIG['whitelist']:
+        logging.info('refusing to follow non-whitelisted actor: %r', actor_uri)
+        return
+
+    logging.info('following: %r', actor_uri)
 
     message = {
         "@context": "https://www.w3.org/ns/activitystreams",
@@ -121,9 +151,12 @@ async def follow_remote_actor(actor_uri):
 
 
 async def unfollow_remote_actor(actor_uri):
-    logging.info('unfollowing: %r', actor_uri)
-
     actor = await fetch_actor(actor_uri)
+    if not actor:
+        logging.info('failed to fetch actor at: %r', actor_uri)
+        return
+
+    logging.info('unfollowing: %r', actor_uri)
 
     message = {
         "@context": "https://www.w3.org/ns/activitystreams",
@@ -147,13 +180,18 @@ def strip_html(data):
     return cgi.escape(no_tags)
 
 
-def distill_inboxes(actor):
+def distill_inboxes(actor, object_id):
     global DATABASE
 
+    origin_hostname = urlsplit(object_id).hostname
+
     inbox = get_actor_inbox(actor)
     targets = [target for target in DATABASE.get('relay-list', []) if target != inbox]
+    targets = [target for target in targets if urlsplit(target).hostname != origin_hostname]
+    hostnames = [urlsplit(target).hostname for target in targets]
 
     assert inbox not in targets
+    assert origin_hostname not in hostnames
 
     return targets
 
@@ -177,16 +215,12 @@ async def handle_relay(actor, data, request):
         logging.debug('>> already relayed %r as %r', object_id, CACHE[object_id])
         return
 
-    # don't relay mastodon announces -- causes LRP fake direction issues
-    if data['type'] == 'Announce' and len(data.get('cc', [])) > 0:
-        return
-
     activity_id = "https://{}/activities/{}".format(request.host, uuid.uuid4())
 
     message = {
         "@context": "https://www.w3.org/ns/activitystreams",
         "type": "Announce",
-        "to": ["https://{}/actor/followers".format(request.host)],
+        "to": ["https://{}/followers".format(request.host)],
         "actor": "https://{}/actor".format(request.host),
         "object": object_id,
         "id": activity_id
@@ -194,7 +228,7 @@ async def handle_relay(actor, data, request):
 
     logging.debug('>> relay: %r', message)
 
-    inboxes = distill_inboxes(actor)
+    inboxes = distill_inboxes(actor, object_id)
 
     futures = [push_message_to_actor({'inbox': inbox}, message, 'https://{}/actor#main-key'.format(request.host)) for inbox in inboxes]
     asyncio.ensure_future(asyncio.gather(*futures))
@@ -202,21 +236,37 @@ async def handle_relay(actor, data, request):
     CACHE[object_id] = activity_id
 
 
+async def handle_forward(actor, data, request):
+    object_id = distill_object_id(data)
+
+    logging.debug('>> Relay %r', data)
+
+    inboxes = distill_inboxes(actor, object_id)
+
+    futures = [
+        push_message_to_actor(
+            {'inbox': inbox},
+            data,
+            'https://{}/actor#main-key'.format(request.host))
+        for inbox in inboxes]
+    asyncio.ensure_future(asyncio.gather(*futures))
+
+
 async def handle_follow(actor, data, request):
     global DATABASE
 
     following = DATABASE.get('relay-list', [])
     inbox = get_actor_inbox(actor)
 
-    if urllib.parse.urlsplit(inbox).hostname in AP_CONFIG['blocked_instances']:
+
+    if urlsplit(inbox).hostname in AP_CONFIG['blocked_instances']:
         return
 
     if inbox not in following:
         following += [inbox]
         DATABASE['relay-list'] = following
 
-        if data['object'].endswith('/actor'):
-            asyncio.ensure_future(follow_remote_actor(actor['id']))
+        asyncio.ensure_future(follow_remote_actor(actor['id']))
 
     message = {
         "@context": "https://www.w3.org/ns/activitystreams",
@@ -226,10 +276,10 @@ async def handle_follow(actor, data, request):
 
         # this is wrong per litepub, but mastodon < 2.4 is not compliant with that profile.
         "object": {
-             "type": "Follow",
-             "id": data["id"],
-             "object": "https://{}/actor".format(request.host),
-             "actor": actor["id"]
+            "type": "Follow",
+            "id": data["id"],
+            "object": "https://{}/actor".format(request.host),
+            "actor": actor["id"]
         },
 
         "id": "https://{}/activities/{}".format(request.host, uuid.uuid4()),
@@ -251,24 +301,38 @@ async def handle_undo(actor, data, request):
             following.remove(inbox)
             DATABASE['relay-list'] = following
 
-        if child['object'].endswith('/actor'):
-            await unfollow_remote_actor(actor['id'])
+        await unfollow_remote_actor(actor['id'])
 
 
 processors = {
     'Announce': handle_relay,
     'Create': handle_relay,
+    'Delete': handle_forward,
     'Follow': handle_follow,
-    'Undo': handle_undo
+    'Undo': handle_undo,
+    'Update': handle_forward,
 }
 
 
 async def inbox(request):
     data = await request.json()
+    instance = urlsplit(data['actor']).hostname
+
+    if AP_CONFIG['blocked_software']:
+        software = await fetch_nodeinfo(instance)
+
+        if software and software.lower() in AP_CONFIG['blocked_software']:
+            raise aiohttp.web.HTTPUnauthorized(body='relays have been blocked', content_type='text/plain')
 
     if 'actor' not in data or not request['validated']:
         raise aiohttp.web.HTTPUnauthorized(body='access denied', content_type='text/plain')
 
+    elif data['type'] != 'Follow' and 'https://{}/inbox'.format(instance) not in DATABASE['relay-list']:
+        raise aiohttp.web.HTTPUnauthorized(body='access denied', content_type='text/plain')
+
+    elif AP_CONFIG['whitelist_enabled'] is True and instance not in AP_CONFIG['whitelist']:
+        raise aiohttp.web.HTTPUnauthorized(body='access denied', content_type='text/plain')
+
     actor = await fetch_actor(data["actor"])
     actor_uri = 'https://{}/actor'.format(request.host)
 
@@ -280,5 +344,4 @@ async def inbox(request):
 
     return aiohttp.web.Response(body=b'{}', content_type='application/activity+json')
 
-
 app.router.add_post('/inbox', inbox)