6974ef51e41e7b0f99c4995e1c55b5d850c34404
[relay.git/.git] / relay / actor.py
1 import aiohttp
2 import aiohttp.web
3 import asyncio
4 import logging
5 import uuid
6 import re
7 import simplejson as json
8 import cgi
9 from urllib.parse import urlsplit
10 from Crypto.PublicKey import RSA
11 from .database import DATABASE
12 from .http_debug import http_debug
13
14 from cachetools import LFUCache
15
16
17 # generate actor keys if not present
18 if "actorKeys" not in DATABASE:
19     logging.info("No actor keys present, generating 4096-bit RSA keypair.")
20
21     privkey = RSA.generate(4096)
22     pubkey = privkey.publickey()
23
24     DATABASE["actorKeys"] = {
25         "publicKey": pubkey.exportKey('PEM').decode('utf-8'),
26         "privateKey": privkey.exportKey('PEM').decode('utf-8')
27     }
28
29
30 PRIVKEY = RSA.importKey(DATABASE["actorKeys"]["privateKey"])
31 PUBKEY = PRIVKEY.publickey()
32
33 sem = asyncio.Semaphore(500)
34
35 from . import app, CONFIG
36 from .remote_actor import fetch_actor
37
38
39 AP_CONFIG = CONFIG['ap']
40 CACHE_SIZE = CONFIG.get('cache-size', 16384)
41
42
43 CACHE = LFUCache(CACHE_SIZE)
44
45
46 async def actor(request):
47     data = {
48         "@context": "https://www.w3.org/ns/activitystreams",
49         "endpoints": {
50             "sharedInbox": "https://{}/inbox".format(request.host)
51         },
52         "followers": "https://{}/followers".format(request.host),
53         "following": "https://{}/following".format(request.host),
54         "inbox": "https://{}/inbox".format(request.host),
55         "name": "ActivityRelay",
56         "type": "Application",
57         "id": "https://{}/actor".format(request.host),
58         "publicKey": {
59             "id": "https://{}/actor#main-key".format(request.host),
60             "owner": "https://{}/actor".format(request.host),
61             "publicKeyPem": DATABASE["actorKeys"]["publicKey"]
62         },
63         "summary": "ActivityRelay bot",
64         "preferredUsername": "relay",
65         "url": "https://{}/actor".format(request.host)
66     }
67     return aiohttp.web.json_response(data)
68
69
70 app.router.add_get('/actor', actor)
71
72
73 from .http_signatures import sign_headers
74
75
76 get_actor_inbox = lambda actor: actor.get('endpoints', {}).get('sharedInbox', actor['inbox'])
77
78
79 async def push_message_to_actor(actor, message, our_key_id):
80     inbox = get_actor_inbox(actor)
81     url = urlsplit(inbox)
82
83     # XXX: Digest
84     data = json.dumps(message)
85     headers = {
86         '(request-target)': 'post {}'.format(url.path),
87         'Content-Length': str(len(data)),
88         'Content-Type': 'application/activity+json',
89         'User-Agent': 'ActivityRelay'
90     }
91     headers['signature'] = sign_headers(headers, PRIVKEY, our_key_id)
92     headers.pop('(request-target)')
93
94     logging.debug('%r >> %r', inbox, message)
95
96     global sem
97     async with sem:
98         try:
99             async with aiohttp.ClientSession(trace_configs=[http_debug()]) as session:
100                 async with session.post(inbox, data=data, headers=headers) as resp:
101                     if resp.status == 202:
102                         return
103                     resp_payload = await resp.text()
104                     logging.debug('%r >> resp %r', inbox, resp_payload)
105         except Exception as e:
106             logging.info('Caught %r while pushing to %r.', e, inbox)
107
108
109 async def follow_remote_actor(actor_uri):
110     actor = await fetch_actor(actor_uri)
111     
112     if not actor:
113         logging.info('failed to fetch actor at: %r', actor_uri)
114         return
115
116     if AP_CONFIG['whitelist_enabled'] is True and urlsplit(actor_uri).hostname not in AP_CONFIG['whitelist']:
117         logging.info('refusing to follow non-whitelisted actor: %r', actor_uri)
118         return
119
120     logging.info('following: %r', actor_uri)
121
122     message = {
123         "@context": "https://www.w3.org/ns/activitystreams",
124         "type": "Follow",
125         "to": [actor['id']],
126         "object": actor['id'],
127         "id": "https://{}/activities/{}".format(AP_CONFIG['host'], uuid.uuid4()),
128         "actor": "https://{}/actor".format(AP_CONFIG['host'])
129     }
130     await push_message_to_actor(actor, message, "https://{}/actor#main-key".format(AP_CONFIG['host']))
131
132
133 async def unfollow_remote_actor(actor_uri):
134     actor = await fetch_actor(actor_uri)
135     if not actor:
136         logging.info('failed to fetch actor at: %r', actor_uri)
137         return
138
139     logging.info('unfollowing: %r', actor_uri)
140
141     message = {
142         "@context": "https://www.w3.org/ns/activitystreams",
143         "type": "Undo",
144         "to": [actor['id']],
145         "object": {
146              "type": "Follow",
147              "object": actor_uri,
148              "actor": actor['id'],
149              "id": "https://{}/activities/{}".format(AP_CONFIG['host'], uuid.uuid4())
150         },
151         "id": "https://{}/activities/{}".format(AP_CONFIG['host'], uuid.uuid4()),
152         "actor": "https://{}/actor".format(AP_CONFIG['host'])
153     }
154     await push_message_to_actor(actor, message, "https://{}/actor#main-key".format(AP_CONFIG['host']))
155
156
157 tag_re = re.compile(r'(<!--.*?-->|<[^>]*>)')
158 def strip_html(data):
159     no_tags = tag_re.sub('', data)
160     return cgi.escape(no_tags)
161
162
163 def distill_inboxes(actor, object_id):
164     global DATABASE
165
166     origin_hostname = urlsplit(object_id).hostname
167
168     inbox = get_actor_inbox(actor)
169     targets = [target for target in DATABASE.get('relay-list', []) if target != inbox]
170     targets = [target for target in targets if urlsplit(target).hostname != origin_hostname]
171     hostnames = [urlsplit(target).hostname for target in targets]
172
173     assert inbox not in targets
174     assert origin_hostname not in hostnames
175
176     return targets
177
178
179 def distill_object_id(activity):
180     logging.debug('>> determining object ID for %r', activity['object'])
181     obj = activity['object']
182
183     if isinstance(obj, str):
184         return obj
185
186     return obj['id']
187
188
189 async def handle_relay(actor, data, request):
190     global CACHE
191
192     object_id = distill_object_id(data)
193
194     if object_id in CACHE:
195         logging.debug('>> already relayed %r as %r', object_id, CACHE[object_id])
196         return
197
198     activity_id = "https://{}/activities/{}".format(request.host, uuid.uuid4())
199
200     message = {
201         "@context": "https://www.w3.org/ns/activitystreams",
202         "type": "Announce",
203         "to": ["https://{}/followers".format(request.host)],
204         "actor": "https://{}/actor".format(request.host),
205         "object": object_id,
206         "id": activity_id
207     }
208
209     logging.debug('>> relay: %r', message)
210
211     inboxes = distill_inboxes(actor, object_id)
212
213     futures = [push_message_to_actor({'inbox': inbox}, message, 'https://{}/actor#main-key'.format(request.host)) for inbox in inboxes]
214     asyncio.ensure_future(asyncio.gather(*futures))
215
216     CACHE[object_id] = activity_id
217
218
219 async def handle_forward(actor, data, request):
220     object_id = distill_object_id(data)
221
222     logging.debug('>> Relay %r', data)
223
224     inboxes = distill_inboxes(actor, object_id)
225
226     futures = [
227         push_message_to_actor(
228             {'inbox': inbox},
229             data,
230             'https://{}/actor#main-key'.format(request.host))
231         for inbox in inboxes]
232     asyncio.ensure_future(asyncio.gather(*futures))
233
234
235 async def handle_follow(actor, data, request):
236     global DATABASE
237
238     following = DATABASE.get('relay-list', [])
239     inbox = get_actor_inbox(actor)
240
241     if urlsplit(inbox).hostname in AP_CONFIG['blocked_instances']:
242         return
243
244     if inbox not in following:
245         following += [inbox]
246         DATABASE['relay-list'] = following
247
248         asyncio.ensure_future(follow_remote_actor(actor['id']))
249
250     message = {
251         "@context": "https://www.w3.org/ns/activitystreams",
252         "type": "Accept",
253         "to": [actor["id"]],
254         "actor": "https://{}/actor".format(request.host),
255
256         # this is wrong per litepub, but mastodon < 2.4 is not compliant with that profile.
257         "object": {
258             "type": "Follow",
259             "id": data["id"],
260             "object": "https://{}/actor".format(request.host),
261             "actor": actor["id"]
262         },
263
264         "id": "https://{}/activities/{}".format(request.host, uuid.uuid4()),
265     }
266
267     asyncio.ensure_future(push_message_to_actor(actor, message, 'https://{}/actor#main-key'.format(request.host)))
268
269
270 async def handle_undo(actor, data, request):
271     global DATABASE
272
273     child = data['object']
274     if child['type'] == 'Follow':
275         following = DATABASE.get('relay-list', [])
276
277         inbox = get_actor_inbox(actor)
278
279         if inbox in following:
280             following.remove(inbox)
281             DATABASE['relay-list'] = following
282
283         await unfollow_remote_actor(actor['id'])
284
285
286 processors = {
287     'Announce': handle_relay,
288     'Create': handle_relay,
289     'Delete': handle_forward,
290     'Follow': handle_follow,
291     'Undo': handle_undo,
292     'Update': handle_forward,
293 }
294
295
296 async def inbox(request):
297     data = await request.json()
298     instance = urlsplit(data['actor']).hostname
299
300     if 'actor' not in data or not request['validated']:
301         raise aiohttp.web.HTTPUnauthorized(body='access denied', content_type='text/plain')
302
303     elif data['type'] != 'Follow' and 'https://{}/inbox'.format(instance) not in DATABASE['relay-list']:
304         raise aiohttp.web.HTTPUnauthorized(body='access denied', content_type='text/plain')
305
306     elif AP_CONFIG['whitelist_enabled'] is True and instance not in AP_CONFIG['whitelist']:
307         raise aiohttp.web.HTTPUnauthorized(body='access denied', content_type='text/plain')
308
309     actor = await fetch_actor(data["actor"])
310     actor_uri = 'https://{}/actor'.format(request.host)
311
312     logging.debug(">> payload %r", data)
313
314     processor = processors.get(data['type'], None)
315     if processor:
316         await processor(actor, data, request)
317
318     return aiohttp.web.Response(body=b'{}', content_type='application/activity+json')
319
320 app.router.add_post('/inbox', inbox)