Fix wrong URI for the relay's followers collection
[relay.git/.git] / relay / actor.py
index 4e010d7..ad09ecb 100644 (file)
@@ -3,12 +3,15 @@ import aiohttp.web
 import asyncio
 import logging
 import uuid
-import urllib.parse
-import simplejson as json
 import re
+import simplejson as json
 import cgi
+from urllib.parse import urlsplit
 from Crypto.PublicKey import RSA
 from .database import DATABASE
+from .http_debug import http_debug
+
+from cachetools import LFUCache
 
 
 # generate actor keys if not present
@@ -19,8 +22,8 @@ if "actorKeys" not in DATABASE:
     pubkey = privkey.publickey()
 
     DATABASE["actorKeys"] = {
-        "publicKey": pubkey.exportKey('PEM'),
-        "privateKey": privkey.exportKey('PEM')
+        "publicKey": pubkey.exportKey('PEM').decode('utf-8'),
+        "privateKey": privkey.exportKey('PEM').decode('utf-8')
     }
 
 
@@ -32,7 +35,11 @@ from . import app, CONFIG
 from .remote_actor import fetch_actor
 
 
-AP_CONFIG = CONFIG.get('ap', {'host': 'localhost'})
+AP_CONFIG = CONFIG.get('ap', {'host': 'localhost','blocked_instances':[]})
+CACHE_SIZE = CONFIG.get('cache-size', 16384)
+
+
+CACHE = LFUCache(CACHE_SIZE)
 
 
 async def actor(request):
@@ -44,7 +51,6 @@ async def actor(request):
         "followers": "https://{}/followers".format(request.host),
         "following": "https://{}/following".format(request.host),
         "inbox": "https://{}/inbox".format(request.host),
-        "sharedInbox": "https://{}/inbox".format(request.host),
         "name": "ActivityRelay",
         "type": "Application",
         "id": "https://{}/actor".format(request.host),
@@ -72,7 +78,7 @@ get_actor_inbox = lambda actor: actor.get('endpoints', {}).get('sharedInbox', ac
 async def push_message_to_actor(actor, message, our_key_id):
     inbox = get_actor_inbox(actor)
 
-    url = urllib.parse.urlsplit(inbox)
+    url = urlsplit(inbox)
 
     # XXX: Digest
     data = json.dumps(message)
@@ -83,19 +89,28 @@ async def push_message_to_actor(actor, message, our_key_id):
         'User-Agent': 'ActivityRelay'
     }
     headers['signature'] = sign_headers(headers, PRIVKEY, our_key_id)
+    headers.pop('(request-target)')
 
     logging.debug('%r >> %r', inbox, message)
 
-    async with aiohttp.ClientSession() as session:
-        async with session.post(inbox, data=data, headers=headers) as resp:
-            resp_payload = await resp.text()
-            logging.debug('%r >> resp %r', inbox, resp_payload)
+    try:
+        async with aiohttp.ClientSession(trace_configs=[http_debug()]) as session:
+            async with session.post(inbox, data=data, headers=headers) as resp:
+                if resp.status == 202:
+                    return
+                resp_payload = await resp.text()
+                logging.debug('%r >> resp %r', inbox, resp_payload)
+    except Exception as e:
+        logging.info('Caught %r while pushing to %r.', e, inbox)
 
 
 async def follow_remote_actor(actor_uri):
-    logging.info('following: %r', actor_uri)
-
     actor = await fetch_actor(actor_uri)
+    if not actor:
+        logging.info('failed to fetch actor at: %r', actor_uri)
+        return
+
+    logging.info('following: %r', actor_uri)
 
     message = {
         "@context": "https://www.w3.org/ns/activitystreams",
@@ -109,9 +124,12 @@ async def follow_remote_actor(actor_uri):
 
 
 async def unfollow_remote_actor(actor_uri):
-    logging.info('unfollowing: %r', actor_uri)
-
     actor = await fetch_actor(actor_uri)
+    if not actor:
+        logging.info('failed to fetch actor at: %r', actor_uri)
+        return
+
+    logging.info('unfollowing: %r', actor_uri)
 
     message = {
         "@context": "https://www.w3.org/ns/activitystreams",
@@ -135,13 +153,18 @@ def strip_html(data):
     return cgi.escape(no_tags)
 
 
-def distill_inboxes(actor):
+def distill_inboxes(actor, object_id):
     global DATABASE
 
+    origin_hostname = urlsplit(object_id).hostname
+
     inbox = get_actor_inbox(actor)
     targets = [target for target in DATABASE.get('relay-list', []) if target != inbox]
+    targets = [target for target in targets if urlsplit(target).hostname != origin_hostname]
+    hostnames = [urlsplit(target).hostname for target in targets]
 
     assert inbox not in targets
+    assert origin_hostname not in hostnames
 
     return targets
 
@@ -157,24 +180,34 @@ def distill_object_id(activity):
 
 
 async def handle_relay(actor, data, request):
+    global CACHE
+
     object_id = distill_object_id(data)
 
+    if object_id in CACHE:
+        logging.debug('>> already relayed %r as %r', object_id, CACHE[object_id])
+        return
+
+    activity_id = "https://{}/activities/{}".format(request.host, uuid.uuid4())
+
     message = {
         "@context": "https://www.w3.org/ns/activitystreams",
         "type": "Announce",
-        "to": ["https://{}/actor/followers".format(request.host)],
+        "to": ["https://{}/followers".format(request.host)],
         "actor": "https://{}/actor".format(request.host),
         "object": object_id,
-        "id": "https://{}/activities/{}".format(request.host, uuid.uuid4())
+        "id": activity_id
     }
 
     logging.debug('>> relay: %r', message)
 
-    inboxes = distill_inboxes(actor)
+    inboxes = distill_inboxes(actor, object_id)
 
     futures = [push_message_to_actor({'inbox': inbox}, message, 'https://{}/actor#main-key'.format(request.host)) for inbox in inboxes]
     asyncio.ensure_future(asyncio.gather(*futures))
 
+    CACHE[object_id] = activity_id
+
 
 async def handle_follow(actor, data, request):
     global DATABASE
@@ -182,10 +215,16 @@ async def handle_follow(actor, data, request):
     following = DATABASE.get('relay-list', [])
     inbox = get_actor_inbox(actor)
 
+    if urlsplit(inbox).hostname in AP_CONFIG['blocked_instances']:
+        return
+
     if inbox not in following:
         following += [inbox]
         DATABASE['relay-list'] = following
 
+        if data['object'].endswith('/actor'):
+            asyncio.ensure_future(follow_remote_actor(actor['id']))
+
     message = {
         "@context": "https://www.w3.org/ns/activitystreams",
         "type": "Accept",
@@ -205,9 +244,6 @@ async def handle_follow(actor, data, request):
 
     asyncio.ensure_future(push_message_to_actor(actor, message, 'https://{}/actor#main-key'.format(request.host)))
 
-    if data['object'].endswith('/actor'):
-        asyncio.ensure_future(follow_remote_actor(actor['id']))
-
 
 async def handle_undo(actor, data, request):
     global DATABASE