import asyncio
import logging
import uuid
-import urllib.parse
-import simplejson as json
import re
+import simplejson as json
import cgi
+from urllib.parse import urlsplit
from Crypto.PublicKey import RSA
from .database import DATABASE
+from .http_debug import http_debug
+
+from cachetools import LFUCache
# generate actor keys if not present
pubkey = privkey.publickey()
DATABASE["actorKeys"] = {
- "publicKey": pubkey.exportKey('PEM'),
- "privateKey": privkey.exportKey('PEM')
+ "publicKey": pubkey.exportKey('PEM').decode('utf-8'),
+ "privateKey": privkey.exportKey('PEM').decode('utf-8')
}
from .remote_actor import fetch_actor
-AP_CONFIG = CONFIG.get('ap', {'host': 'localhost'})
+AP_CONFIG = CONFIG.get('ap', {'host': 'localhost','blocked_instances':[]})
+CACHE_SIZE = CONFIG.get('cache-size', 16384)
+
+
+CACHE = LFUCache(CACHE_SIZE)
async def actor(request):
"followers": "https://{}/followers".format(request.host),
"following": "https://{}/following".format(request.host),
"inbox": "https://{}/inbox".format(request.host),
- "sharedInbox": "https://{}/inbox".format(request.host),
"name": "ActivityRelay",
"type": "Application",
"id": "https://{}/actor".format(request.host),
async def push_message_to_actor(actor, message, our_key_id):
inbox = get_actor_inbox(actor)
- url = urllib.parse.urlsplit(inbox)
+ url = urlsplit(inbox)
# XXX: Digest
data = json.dumps(message)
'User-Agent': 'ActivityRelay'
}
headers['signature'] = sign_headers(headers, PRIVKEY, our_key_id)
+ headers.pop('(request-target)')
logging.debug('%r >> %r', inbox, message)
- async with aiohttp.ClientSession() as session:
- async with session.post(inbox, data=data, headers=headers) as resp:
- resp_payload = await resp.text()
- logging.debug('%r >> resp %r', inbox, resp_payload)
+ try:
+ async with aiohttp.ClientSession(trace_configs=[http_debug()]) as session:
+ async with session.post(inbox, data=data, headers=headers) as resp:
+ if resp.status == 202:
+ return
+ resp_payload = await resp.text()
+ logging.debug('%r >> resp %r', inbox, resp_payload)
+ except Exception as e:
+ logging.info('Caught %r while pushing to %r.', e, inbox)
async def follow_remote_actor(actor_uri):
- logging.info('following: %r', actor_uri)
-
actor = await fetch_actor(actor_uri)
+ if not actor:
+ logging.info('failed to fetch actor at: %r', actor_uri)
+ return
+
+ logging.info('following: %r', actor_uri)
message = {
"@context": "https://www.w3.org/ns/activitystreams",
async def unfollow_remote_actor(actor_uri):
- logging.info('unfollowing: %r', actor_uri)
-
actor = await fetch_actor(actor_uri)
+ if not actor:
+ logging.info('failed to fetch actor at: %r', actor_uri)
+ return
+
+ logging.info('unfollowing: %r', actor_uri)
message = {
"@context": "https://www.w3.org/ns/activitystreams",
return cgi.escape(no_tags)
-async def handle_create(actor, data, request):
- pass
+def distill_inboxes(actor, object_id):
+ global DATABASE
+
+ origin_hostname = urlsplit(object_id).hostname
+
+ inbox = get_actor_inbox(actor)
+ targets = [target for target in DATABASE.get('relay-list', []) if target != inbox]
+ targets = [target for target in targets if urlsplit(target).hostname != origin_hostname]
+ hostnames = [urlsplit(target).hostname for target in targets]
+
+ assert inbox not in targets
+ assert origin_hostname not in hostnames
+
+ return targets
+
+
+def distill_object_id(activity):
+ logging.debug('>> determining object ID for %r', activity['object'])
+ obj = activity['object']
+
+ if isinstance(obj, str):
+ return obj
+
+ return obj['id']
+
+
+async def handle_relay(actor, data, request):
+ global CACHE
+
+ object_id = distill_object_id(data)
+
+ if object_id in CACHE:
+ logging.debug('>> already relayed %r as %r', object_id, CACHE[object_id])
+ return
+
+ activity_id = "https://{}/activities/{}".format(request.host, uuid.uuid4())
+
+ message = {
+ "@context": "https://www.w3.org/ns/activitystreams",
+ "type": "Announce",
+ "to": ["https://{}/followers".format(request.host)],
+ "actor": "https://{}/actor".format(request.host),
+ "object": object_id,
+ "id": activity_id
+ }
+
+ logging.debug('>> relay: %r', message)
+
+ inboxes = distill_inboxes(actor, object_id)
+
+ futures = [push_message_to_actor({'inbox': inbox}, message, 'https://{}/actor#main-key'.format(request.host)) for inbox in inboxes]
+ asyncio.ensure_future(asyncio.gather(*futures))
+
+ CACHE[object_id] = activity_id
async def handle_follow(actor, data, request):
following = DATABASE.get('relay-list', [])
inbox = get_actor_inbox(actor)
+ if urlsplit(inbox).hostname in AP_CONFIG['blocked_instances']:
+ return
+
if inbox not in following:
following += [inbox]
DATABASE['relay-list'] = following
+ if data['object'].endswith('/actor'):
+ asyncio.ensure_future(follow_remote_actor(actor['id']))
+
message = {
"@context": "https://www.w3.org/ns/activitystreams",
"type": "Accept",
asyncio.ensure_future(push_message_to_actor(actor, message, 'https://{}/actor#main-key'.format(request.host)))
- if data['object'].endswith('/actor'):
- asyncio.ensure_future(follow_remote_actor(actor['id']))
-
async def handle_undo(actor, data, request):
global DATABASE
processors = {
- 'Create': handle_create,
+ 'Announce': handle_relay,
+ 'Create': handle_relay,
'Follow': handle_follow,
'Undo': handle_undo
}