Apply 1 suggestion(s) to 1 file(s)
[relay.git/.git] / relay / actor.py
1 import aiohttp
2 import aiohttp.web
3 import asyncio
4 import logging
5 import uuid
6 import re
7 import simplejson as json
8 import cgi
9 import datetime
10
11 from urllib.parse import urlsplit
12 from Crypto.PublicKey import RSA
13 from cachetools import LFUCache
14
15 from . import app, CONFIG
16 from .database import DATABASE
17 from .http_debug import http_debug
18 from .remote_actor import fetch_actor
19 from .http_signatures import sign_headers, generate_body_digest
20
21
22 # generate actor keys if not present
23 if "actorKeys" not in DATABASE:
24     logging.info("No actor keys present, generating 4096-bit RSA keypair.")
25
26     privkey = RSA.generate(4096)
27     pubkey = privkey.publickey()
28
29     DATABASE["actorKeys"] = {
30         "publicKey": pubkey.exportKey('PEM').decode('utf-8'),
31         "privateKey": privkey.exportKey('PEM').decode('utf-8')
32     }
33
34
35 PRIVKEY = RSA.importKey(DATABASE["actorKeys"]["privateKey"])
36 PUBKEY = PRIVKEY.publickey()
37 AP_CONFIG = CONFIG['ap']
38 CACHE_SIZE = CONFIG.get('cache-size', 16384)
39 CACHE = LFUCache(CACHE_SIZE)
40
41 sem = asyncio.Semaphore(500)
42
43
44 async def actor(request):
45     data = {
46         "@context": "https://www.w3.org/ns/activitystreams",
47         "endpoints": {
48             "sharedInbox": "https://{}/inbox".format(request.host)
49         },
50         "followers": "https://{}/followers".format(request.host),
51         "following": "https://{}/following".format(request.host),
52         "inbox": "https://{}/inbox".format(request.host),
53         "name": "ActivityRelay",
54         "type": "Application",
55         "id": "https://{}/actor".format(request.host),
56         "publicKey": {
57             "id": "https://{}/actor#main-key".format(request.host),
58             "owner": "https://{}/actor".format(request.host),
59             "publicKeyPem": DATABASE["actorKeys"]["publicKey"]
60         },
61         "summary": "ActivityRelay bot",
62         "preferredUsername": "relay",
63         "url": "https://{}/actor".format(request.host)
64     }
65     return aiohttp.web.json_response(data, content_type='application/activity+json')
66
67
68 app.router.add_get('/actor', actor)
69 get_actor_inbox = lambda actor: actor.get('endpoints', {}).get('sharedInbox', actor['inbox'])
70
71
72 async def push_message_to_actor(actor, message, our_key_id):
73     inbox = get_actor_inbox(actor)
74     url = urlsplit(inbox)
75
76     # XXX: Digest
77     data = json.dumps(message)
78     headers = {
79         '(request-target)': 'post {}'.format(url.path),
80         'Content-Length': str(len(data)),
81         'Content-Type': 'application/activity+json',
82         'User-Agent': 'ActivityRelay',
83         'Host': url.netloc,
84         'Digest': 'SHA-256={}'.format(generate_body_digest(data)),
85         'Date': datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
86     }
87     headers['signature'] = sign_headers(headers, PRIVKEY, our_key_id)
88     headers.pop('(request-target)')
89     headers.pop('Host')
90
91     logging.debug('%r >> %r', inbox, message)
92
93     global sem
94     async with sem:
95         try:
96             async with aiohttp.ClientSession(trace_configs=[http_debug()]) as session:
97                 async with session.post(inbox, data=data, headers=headers) as resp:
98                     if resp.status == 202:
99                         return
100                     resp_payload = await resp.text()
101                     logging.debug('%r >> resp %r', inbox, resp_payload)
102         except Exception as e:
103             logging.info('Caught %r while pushing to %r.', e, inbox)
104
105
106 async def fetch_nodeinfo(domain):
107     headers = {'Accept': 'application/activity+json'}
108     nodeinfo_url = None
109
110     wk_nodeinfo = await fetch_actor(f'https://{domain}/.well-known/nodeinfo', headers=headers)
111
112     if not wk_nodeinfo:
113         return
114
115     for link in wk_nodeinfo.get('links', ''):
116         if link['rel'] == 'http://nodeinfo.diaspora.software/ns/schema/2.0':
117             nodeinfo_url = link['href']
118             break
119
120     if not nodeinfo_url:
121         return
122
123     nodeinfo_data = await fetch_actor(nodeinfo_url, headers=headers)
124     software = nodeinfo_data.get('software')
125
126     return software.get('name') if software else None
127
128
129 async def follow_remote_actor(actor_uri):
130     actor = await fetch_actor(actor_uri)
131     
132     if not actor:
133         logging.info('failed to fetch actor at: %r', actor_uri)
134         return
135
136     if AP_CONFIG['whitelist_enabled'] is True and urlsplit(actor_uri).hostname not in AP_CONFIG['whitelist']:
137         logging.info('refusing to follow non-whitelisted actor: %r', actor_uri)
138         return
139
140     logging.info('following: %r', actor_uri)
141
142     message = {
143         "@context": "https://www.w3.org/ns/activitystreams",
144         "type": "Follow",
145         "to": [actor['id']],
146         "object": actor['id'],
147         "id": "https://{}/activities/{}".format(AP_CONFIG['host'], uuid.uuid4()),
148         "actor": "https://{}/actor".format(AP_CONFIG['host'])
149     }
150     await push_message_to_actor(actor, message, "https://{}/actor#main-key".format(AP_CONFIG['host']))
151
152
153 async def unfollow_remote_actor(actor_uri):
154     actor = await fetch_actor(actor_uri)
155     if not actor:
156         logging.info('failed to fetch actor at: %r', actor_uri)
157         return
158
159     logging.info('unfollowing: %r', actor_uri)
160
161     message = {
162         "@context": "https://www.w3.org/ns/activitystreams",
163         "type": "Undo",
164         "to": [actor['id']],
165         "object": {
166              "type": "Follow",
167              "object": actor_uri,
168              "actor": actor['id'],
169              "id": "https://{}/activities/{}".format(AP_CONFIG['host'], uuid.uuid4())
170         },
171         "id": "https://{}/activities/{}".format(AP_CONFIG['host'], uuid.uuid4()),
172         "actor": "https://{}/actor".format(AP_CONFIG['host'])
173     }
174     await push_message_to_actor(actor, message, "https://{}/actor#main-key".format(AP_CONFIG['host']))
175
176
177 tag_re = re.compile(r'(<!--.*?-->|<[^>]*>)')
178 def strip_html(data):
179     no_tags = tag_re.sub('', data)
180     return cgi.escape(no_tags)
181
182
183 def distill_inboxes(actor, object_id):
184     global DATABASE
185
186     origin_hostname = urlsplit(object_id).hostname
187
188     inbox = get_actor_inbox(actor)
189     targets = [target for target in DATABASE.get('relay-list', []) if target != inbox]
190     targets = [target for target in targets if urlsplit(target).hostname != origin_hostname]
191     hostnames = [urlsplit(target).hostname for target in targets]
192
193     assert inbox not in targets
194     assert origin_hostname not in hostnames
195
196     return targets
197
198
199 def distill_object_id(activity):
200     logging.debug('>> determining object ID for %r', activity['object'])
201     obj = activity['object']
202
203     if isinstance(obj, str):
204         return obj
205
206     return obj['id']
207
208
209 async def handle_relay(actor, data, request):
210     global CACHE
211
212     object_id = distill_object_id(data)
213
214     if object_id in CACHE:
215         logging.debug('>> already relayed %r as %r', object_id, CACHE[object_id])
216         return
217
218     activity_id = "https://{}/activities/{}".format(request.host, uuid.uuid4())
219
220     message = {
221         "@context": "https://www.w3.org/ns/activitystreams",
222         "type": "Announce",
223         "to": ["https://{}/followers".format(request.host)],
224         "actor": "https://{}/actor".format(request.host),
225         "object": object_id,
226         "id": activity_id
227     }
228
229     logging.debug('>> relay: %r', message)
230
231     inboxes = distill_inboxes(actor, object_id)
232
233     futures = [push_message_to_actor({'inbox': inbox}, message, 'https://{}/actor#main-key'.format(request.host)) for inbox in inboxes]
234     asyncio.ensure_future(asyncio.gather(*futures))
235
236     CACHE[object_id] = activity_id
237
238
239 async def handle_forward(actor, data, request):
240     object_id = distill_object_id(data)
241
242     logging.debug('>> Relay %r', data)
243
244     inboxes = distill_inboxes(actor, object_id)
245
246     futures = [
247         push_message_to_actor(
248             {'inbox': inbox},
249             data,
250             'https://{}/actor#main-key'.format(request.host))
251         for inbox in inboxes]
252     asyncio.ensure_future(asyncio.gather(*futures))
253
254
255 async def handle_follow(actor, data, request):
256     global DATABASE
257
258     following = DATABASE.get('relay-list', [])
259     inbox = get_actor_inbox(actor)
260
261
262     if urlsplit(inbox).hostname in AP_CONFIG['blocked_instances']:
263         return
264
265     if inbox not in following:
266         following += [inbox]
267         DATABASE['relay-list'] = following
268
269         asyncio.ensure_future(follow_remote_actor(actor['id']))
270
271     message = {
272         "@context": "https://www.w3.org/ns/activitystreams",
273         "type": "Accept",
274         "to": [actor["id"]],
275         "actor": "https://{}/actor".format(request.host),
276
277         # this is wrong per litepub, but mastodon < 2.4 is not compliant with that profile.
278         "object": {
279             "type": "Follow",
280             "id": data["id"],
281             "object": "https://{}/actor".format(request.host),
282             "actor": actor["id"]
283         },
284
285         "id": "https://{}/activities/{}".format(request.host, uuid.uuid4()),
286     }
287
288     asyncio.ensure_future(push_message_to_actor(actor, message, 'https://{}/actor#main-key'.format(request.host)))
289
290
291 async def handle_undo(actor, data, request):
292     global DATABASE
293
294     child = data['object']
295     if child['type'] == 'Follow':
296         following = DATABASE.get('relay-list', [])
297
298         inbox = get_actor_inbox(actor)
299
300         if inbox in following:
301             following.remove(inbox)
302             DATABASE['relay-list'] = following
303
304         await unfollow_remote_actor(actor['id'])
305
306
307 processors = {
308     'Announce': handle_relay,
309     'Create': handle_relay,
310     'Delete': handle_forward,
311     'Follow': handle_follow,
312     'Undo': handle_undo,
313     'Update': handle_forward,
314 }
315
316
317 async def inbox(request):
318     data = await request.json()
319     instance = urlsplit(data['actor']).hostname
320
321     if AP_CONFIG['blocked_software']:
322         software = await fetch_nodeinfo(instance)
323
324         if software and software.lower() in AP_CONFIG['blocked_software']:
325             raise aiohttp.web.HTTPUnauthorized(body='relays have been blocked', content_type='text/plain')
326
327     if 'actor' not in data or not request['validated']:
328         raise aiohttp.web.HTTPUnauthorized(body='access denied', content_type='text/plain')
329
330     elif data['type'] != 'Follow' and 'https://{}/inbox'.format(instance) not in DATABASE['relay-list']:
331         raise aiohttp.web.HTTPUnauthorized(body='access denied', content_type='text/plain')
332
333     elif AP_CONFIG['whitelist_enabled'] is True and instance not in AP_CONFIG['whitelist']:
334         raise aiohttp.web.HTTPUnauthorized(body='access denied', content_type='text/plain')
335
336     actor = await fetch_actor(data["actor"])
337     actor_uri = 'https://{}/actor'.format(request.host)
338
339     logging.debug(">> payload %r", data)
340
341     processor = processors.get(data['type'], None)
342     if processor:
343         await processor(actor, data, request)
344
345     return aiohttp.web.Response(body=b'{}', content_type='application/activity+json')
346
347 app.router.add_post('/inbox', inbox)