Merge branch 'upstream'
[relay.git/.git] / relay / actor.py
1 import aiohttp
2 import aiohttp.web
3 import asyncio
4 import logging
5 import uuid
6 import re
7 import simplejson as json
8 import cgi
9 from urllib.parse import urlsplit
10 from Crypto.PublicKey import RSA
11 from .database import DATABASE
12 from .http_debug import http_debug
13
14 from cachetools import LFUCache
15
16
17 # generate actor keys if not present
18 if "actorKeys" not in DATABASE:
19     logging.info("No actor keys present, generating 4096-bit RSA keypair.")
20
21     privkey = RSA.generate(4096)
22     pubkey = privkey.publickey()
23
24     DATABASE["actorKeys"] = {
25         "publicKey": pubkey.exportKey('PEM').decode('utf-8'),
26         "privateKey": privkey.exportKey('PEM').decode('utf-8')
27     }
28
29
30 PRIVKEY = RSA.importKey(DATABASE["actorKeys"]["privateKey"])
31 PUBKEY = PRIVKEY.publickey()
32
33
34 from . import app, CONFIG
35 from .remote_actor import fetch_actor
36
37
38 AP_CONFIG = CONFIG.get('ap', {
39     'host': 'localhost',
40     'blocked_instances': [],
41     'allowed_instances': [],
42 })
43 CACHE_SIZE = CONFIG.get('cache-size', 16384)
44
45
46 CACHE = LFUCache(CACHE_SIZE)
47
48
49 async def actor(request):
50     data = {
51         "@context": "https://www.w3.org/ns/activitystreams",
52         "endpoints": {
53             "sharedInbox": "https://{}/inbox".format(request.host)
54         },
55         "followers": "https://{}/followers".format(request.host),
56         "following": "https://{}/following".format(request.host),
57         "inbox": "https://{}/inbox".format(request.host),
58         "name": "ActivityRelay",
59         "type": "Application",
60         "id": "https://{}/actor".format(request.host),
61         "publicKey": {
62             "id": "https://{}/actor#main-key".format(request.host),
63             "owner": "https://{}/actor".format(request.host),
64             "publicKeyPem": DATABASE["actorKeys"]["publicKey"]
65         },
66         "summary": "ActivityRelay bot",
67         "preferredUsername": "relay",
68         "url": "https://{}/actor".format(request.host)
69     }
70     return aiohttp.web.json_response(data)
71
72
73 app.router.add_get('/actor', actor)
74
75
76 from .http_signatures import sign_headers
77
78
79 get_actor_inbox = lambda actor: actor.get('endpoints', {}).get('sharedInbox', actor['inbox'])
80
81
82 async def push_message_to_actor(actor, message, our_key_id):
83     inbox = get_actor_inbox(actor)
84
85     url = urlsplit(inbox)
86
87     # XXX: Digest
88     data = json.dumps(message)
89     headers = {
90         '(request-target)': 'post {}'.format(url.path),
91         'Content-Length': str(len(data)),
92         'Content-Type': 'application/activity+json',
93         'User-Agent': 'ActivityRelay'
94     }
95     headers['signature'] = sign_headers(headers, PRIVKEY, our_key_id)
96     headers.pop('(request-target)')
97
98     logging.debug('%r >> %r', inbox, message)
99
100     try:
101         async with aiohttp.ClientSession(trace_configs=[http_debug()]) as session:
102             async with session.post(inbox, data=data, headers=headers) as resp:
103                 if resp.status == 202:
104                     return
105                 resp_payload = await resp.text()
106                 logging.debug('%r >> resp %r', inbox, resp_payload)
107     except Exception as e:
108         logging.info('Caught %r while pushing to %r.', e, inbox)
109
110
111 async def follow_remote_actor(actor_uri):
112     actor = await fetch_actor(actor_uri)
113     if not actor:
114         logging.info('failed to fetch actor at: %r', actor_uri)
115         return
116
117     logging.info('following: %r', actor_uri)
118
119     message = {
120         "@context": "https://www.w3.org/ns/activitystreams",
121         "type": "Follow",
122         "to": [actor['id']],
123         "object": actor['id'],
124         "id": "https://{}/activities/{}".format(AP_CONFIG['host'], uuid.uuid4()),
125         "actor": "https://{}/actor".format(AP_CONFIG['host'])
126     }
127     await push_message_to_actor(actor, message, "https://{}/actor#main-key".format(AP_CONFIG['host']))
128
129
130 async def unfollow_remote_actor(actor_uri):
131     actor = await fetch_actor(actor_uri)
132     if not actor:
133         logging.info('failed to fetch actor at: %r', actor_uri)
134         return
135
136     logging.info('unfollowing: %r', actor_uri)
137
138     message = {
139         "@context": "https://www.w3.org/ns/activitystreams",
140         "type": "Undo",
141         "to": [actor['id']],
142         "object": {
143              "type": "Follow",
144              "object": actor_uri,
145              "actor": actor['id'],
146              "id": "https://{}/activities/{}".format(AP_CONFIG['host'], uuid.uuid4())
147         },
148         "id": "https://{}/activities/{}".format(AP_CONFIG['host'], uuid.uuid4()),
149         "actor": "https://{}/actor".format(AP_CONFIG['host'])
150     }
151     await push_message_to_actor(actor, message, "https://{}/actor#main-key".format(AP_CONFIG['host']))
152
153
154 tag_re = re.compile(r'(<!--.*?-->|<[^>]*>)')
155 def strip_html(data):
156     no_tags = tag_re.sub('', data)
157     return cgi.escape(no_tags)
158
159
160 def distill_inboxes(actor, object_id):
161     global DATABASE
162
163     origin_hostname = urlsplit(object_id).hostname
164
165     inbox = get_actor_inbox(actor)
166     targets = [target for target in DATABASE.get('relay-list', []) if target != inbox]
167     targets = [target for target in targets if urlsplit(target).hostname != origin_hostname]
168     hostnames = [urlsplit(target).hostname for target in targets]
169
170     assert inbox not in targets
171     assert origin_hostname not in hostnames
172
173     return targets
174
175
176 def distill_object_id(activity):
177     logging.debug('>> determining object ID for %r', activity['object'])
178     obj = activity['object']
179
180     if isinstance(obj, str):
181         return obj
182
183     return obj['id']
184
185
186 async def handle_relay(actor, data, request):
187     global CACHE
188
189     object_id = distill_object_id(data)
190
191     if object_id in CACHE:
192         logging.debug('>> already relayed %r as %r', object_id, CACHE[object_id])
193         return
194
195     activity_id = "https://{}/activities/{}".format(request.host, uuid.uuid4())
196
197     message = {
198         "@context": "https://www.w3.org/ns/activitystreams",
199         "type": "Announce",
200         "to": ["https://{}/actor/followers".format(request.host)],
201         "actor": "https://{}/actor".format(request.host),
202         "object": object_id,
203         "id": activity_id
204     }
205
206     logging.debug('>> relay: %r', message)
207
208     inboxes = distill_inboxes(actor, object_id)
209
210     futures = [push_message_to_actor({'inbox': inbox}, message, 'https://{}/actor#main-key'.format(request.host)) for inbox in inboxes]
211     asyncio.ensure_future(asyncio.gather(*futures))
212
213     CACHE[object_id] = activity_id
214
215
216 async def handle_follow(actor, data, request):
217     global DATABASE
218
219     following = DATABASE.get('relay-list', [])
220     inbox = get_actor_inbox(actor)
221
222     if urlsplit(inbox).hostname in AP_CONFIG['blocked_instances']:
223         return
224
225     if AP_CONFIG['allowed_instances'] and\
226             urlsplit(inbox).hostname not in AP_CONFIG['allowed_instances']:
227         return
228
229     if inbox not in following:
230         following += [inbox]
231         DATABASE['relay-list'] = following
232
233         if data['object'].endswith('/actor'):
234             asyncio.ensure_future(follow_remote_actor(actor['id']))
235
236     message = {
237         "@context": "https://www.w3.org/ns/activitystreams",
238         "type": "Accept",
239         "to": [actor["id"]],
240         "actor": "https://{}/actor".format(request.host),
241
242         # this is wrong per litepub, but mastodon < 2.4 is not compliant with that profile.
243         "object": {
244              "type": "Follow",
245              "id": data["id"],
246              "object": "https://{}/actor".format(request.host),
247              "actor": actor["id"]
248         },
249
250         "id": "https://{}/activities/{}".format(request.host, uuid.uuid4()),
251     }
252
253     asyncio.ensure_future(push_message_to_actor(actor, message, 'https://{}/actor#main-key'.format(request.host)))
254
255
256 async def handle_undo(actor, data, request):
257     global DATABASE
258
259     child = data['object']
260     if child['type'] == 'Follow':
261         following = DATABASE.get('relay-list', [])
262
263         inbox = get_actor_inbox(actor)
264
265         if inbox in following:
266             following.remove(inbox)
267             DATABASE['relay-list'] = following
268
269         if child['object'].endswith('/actor'):
270             await unfollow_remote_actor(actor['id'])
271
272
273 processors = {
274     'Announce': handle_relay,
275     'Create': handle_relay,
276     'Follow': handle_follow,
277     'Undo': handle_undo
278 }
279
280
281 async def inbox(request):
282     data = await request.json()
283
284     if 'actor' not in data or not request['validated']:
285         raise aiohttp.web.HTTPUnauthorized(body='access denied', content_type='text/plain')
286
287     actor = await fetch_actor(data["actor"])
288     actor_uri = 'https://{}/actor'.format(request.host)
289
290     logging.debug(">> payload %r", data)
291
292     processor = processors.get(data['type'], None)
293     if processor:
294         await processor(actor, data, request)
295
296     return aiohttp.web.Response(body=b'{}', content_type='application/activity+json')
297
298
299 app.router.add_post('/inbox', inbox)