a8d661344fd227d70e862fedc88c6222fdc9d40f
[relay.git/.git] / relay / actor.py
1 import aiohttp
2 import aiohttp.web
3 import asyncio
4 import logging
5 import uuid
6 import re
7 import simplejson as json
8 import cgi
9 from urllib.parse import urlsplit
10 from Crypto.PublicKey import RSA
11 from .database import DATABASE
12 from .http_debug import http_debug
13
14 from cachetools import LFUCache
15
16
17 # generate actor keys if not present
18 if "actorKeys" not in DATABASE:
19     logging.info("No actor keys present, generating 4096-bit RSA keypair.")
20
21     privkey = RSA.generate(4096)
22     pubkey = privkey.publickey()
23
24     DATABASE["actorKeys"] = {
25         "publicKey": pubkey.exportKey('PEM').decode('utf-8'),
26         "privateKey": privkey.exportKey('PEM').decode('utf-8')
27     }
28
29
30 PRIVKEY = RSA.importKey(DATABASE["actorKeys"]["privateKey"])
31 PUBKEY = PRIVKEY.publickey()
32
33
34 from . import app, CONFIG
35 from .remote_actor import fetch_actor
36
37
38 AP_CONFIG = CONFIG.get('ap', {'host': 'localhost','blocked_instances':[]})
39 CACHE_SIZE = CONFIG.get('cache-size', 16384)
40
41
42 CACHE = LFUCache(CACHE_SIZE)
43
44
45 async def actor(request):
46     data = {
47         "@context": "https://www.w3.org/ns/activitystreams",
48         "endpoints": {
49             "sharedInbox": "https://{}/inbox".format(request.host)
50         },
51         "followers": "https://{}/followers".format(request.host),
52         "following": "https://{}/following".format(request.host),
53         "inbox": "https://{}/inbox".format(request.host),
54         "name": "ActivityRelay",
55         "type": "Application",
56         "id": "https://{}/actor".format(request.host),
57         "publicKey": {
58             "id": "https://{}/actor#main-key".format(request.host),
59             "owner": "https://{}/actor".format(request.host),
60             "publicKeyPem": DATABASE["actorKeys"]["publicKey"]
61         },
62         "summary": "ActivityRelay bot",
63         "preferredUsername": "relay",
64         "url": "https://{}/actor".format(request.host)
65     }
66     return aiohttp.web.json_response(data)
67
68
69 app.router.add_get('/actor', actor)
70
71
72 from .http_signatures import sign_headers
73
74
75 get_actor_inbox = lambda actor: actor.get('endpoints', {}).get('sharedInbox', actor['inbox'])
76
77
78 async def push_message_to_actor(actor, message, our_key_id):
79     inbox = get_actor_inbox(actor)
80
81     url = urlsplit(inbox)
82
83     # XXX: Digest
84     data = json.dumps(message)
85     headers = {
86         '(request-target)': 'post {}'.format(url.path),
87         'Content-Length': str(len(data)),
88         'Content-Type': 'application/activity+json',
89         'User-Agent': 'ActivityRelay'
90     }
91     headers['signature'] = sign_headers(headers, PRIVKEY, our_key_id)
92     headers.pop('(request-target)')
93
94     logging.debug('%r >> %r', inbox, message)
95
96     try:
97         async with aiohttp.ClientSession(trace_configs=[http_debug()]) as session:
98             async with session.post(inbox, data=data, headers=headers) as resp:
99                 if resp.status == 202:
100                     return
101                 resp_payload = await resp.text()
102                 logging.debug('%r >> resp %r', inbox, resp_payload)
103     except Exception as e:
104         logging.info('Caught %r while pushing to %r.', e, inbox)
105
106
107 async def follow_remote_actor(actor_uri):
108     actor = await fetch_actor(actor_uri)
109     if not actor:
110         logging.info('failed to fetch actor at: %r', actor_uri)
111         return
112
113     logging.info('following: %r', actor_uri)
114
115     message = {
116         "@context": "https://www.w3.org/ns/activitystreams",
117         "type": "Follow",
118         "to": [actor['id']],
119         "object": actor['id'],
120         "id": "https://{}/activities/{}".format(AP_CONFIG['host'], uuid.uuid4()),
121         "actor": "https://{}/actor".format(AP_CONFIG['host'])
122     }
123     await push_message_to_actor(actor, message, "https://{}/actor#main-key".format(AP_CONFIG['host']))
124
125
126 async def unfollow_remote_actor(actor_uri):
127     actor = await fetch_actor(actor_uri)
128     if not actor:
129         logging.info('failed to fetch actor at: %r', actor_uri)
130         return
131
132     logging.info('unfollowing: %r', actor_uri)
133
134     message = {
135         "@context": "https://www.w3.org/ns/activitystreams",
136         "type": "Undo",
137         "to": [actor['id']],
138         "object": {
139              "type": "Follow",
140              "object": actor_uri,
141              "actor": actor['id'],
142              "id": "https://{}/activities/{}".format(AP_CONFIG['host'], uuid.uuid4())
143         },
144         "id": "https://{}/activities/{}".format(AP_CONFIG['host'], uuid.uuid4()),
145         "actor": "https://{}/actor".format(AP_CONFIG['host'])
146     }
147     await push_message_to_actor(actor, message, "https://{}/actor#main-key".format(AP_CONFIG['host']))
148
149
150 tag_re = re.compile(r'(<!--.*?-->|<[^>]*>)')
151 def strip_html(data):
152     no_tags = tag_re.sub('', data)
153     return cgi.escape(no_tags)
154
155
156 def distill_inboxes(actor, object_id):
157     global DATABASE
158
159     origin_hostname = urlsplit(object_id).hostname
160
161     inbox = get_actor_inbox(actor)
162     targets = [target for target in DATABASE.get('relay-list', []) if target != inbox]
163     targets = [target for target in targets if urlsplit(target).hostname != origin_hostname]
164     hostnames = [urlsplit(target).hostname for target in targets]
165
166     assert inbox not in targets
167     assert origin_hostname not in hostnames
168
169     return targets
170
171
172 def distill_object_id(activity):
173     logging.debug('>> determining object ID for %r', activity['object'])
174     obj = activity['object']
175
176     if isinstance(obj, str):
177         return obj
178
179     return obj['id']
180
181
182 async def handle_relay(actor, data, request):
183     global CACHE
184
185     object_id = distill_object_id(data)
186
187     if object_id in CACHE:
188         logging.debug('>> already relayed %r as %r', object_id, CACHE[object_id])
189         return
190
191     activity_id = "https://{}/activities/{}".format(request.host, uuid.uuid4())
192
193     message = {
194         "@context": "https://www.w3.org/ns/activitystreams",
195         "type": "Announce",
196         "to": ["https://{}/actor/followers".format(request.host)],
197         "actor": "https://{}/actor".format(request.host),
198         "object": object_id,
199         "id": activity_id
200     }
201
202     logging.debug('>> relay: %r', message)
203
204     inboxes = distill_inboxes(actor, object_id)
205
206     futures = [push_message_to_actor({'inbox': inbox}, message, 'https://{}/actor#main-key'.format(request.host)) for inbox in inboxes]
207     asyncio.ensure_future(asyncio.gather(*futures))
208
209     CACHE[object_id] = activity_id
210
211
212 async def handle_follow(actor, data, request):
213     global DATABASE
214
215     following = DATABASE.get('relay-list', [])
216     inbox = get_actor_inbox(actor)
217
218     if urlsplit(inbox).hostname in AP_CONFIG['blocked_instances']:
219         return
220
221     if inbox not in following:
222         following += [inbox]
223         DATABASE['relay-list'] = following
224
225         if data['object'].endswith('/actor'):
226             asyncio.ensure_future(follow_remote_actor(actor['id']))
227
228     message = {
229         "@context": "https://www.w3.org/ns/activitystreams",
230         "type": "Accept",
231         "to": [actor["id"]],
232         "actor": "https://{}/actor".format(request.host),
233
234         # this is wrong per litepub, but mastodon < 2.4 is not compliant with that profile.
235         "object": {
236              "type": "Follow",
237              "id": data["id"],
238              "object": "https://{}/actor".format(request.host),
239              "actor": actor["id"]
240         },
241
242         "id": "https://{}/activities/{}".format(request.host, uuid.uuid4()),
243     }
244
245     asyncio.ensure_future(push_message_to_actor(actor, message, 'https://{}/actor#main-key'.format(request.host)))
246
247
248 async def handle_undo(actor, data, request):
249     global DATABASE
250
251     child = data['object']
252     if child['type'] == 'Follow':
253         following = DATABASE.get('relay-list', [])
254
255         inbox = get_actor_inbox(actor)
256
257         if inbox in following:
258             following.remove(inbox)
259             DATABASE['relay-list'] = following
260
261         if child['object'].endswith('/actor'):
262             await unfollow_remote_actor(actor['id'])
263
264
265 processors = {
266     'Announce': handle_relay,
267     'Create': handle_relay,
268     'Follow': handle_follow,
269     'Undo': handle_undo
270 }
271
272
273 async def inbox(request):
274     data = await request.json()
275
276     if 'actor' not in data or not request['validated']:
277         raise aiohttp.web.HTTPUnauthorized(body='access denied', content_type='text/plain')
278
279     actor = await fetch_actor(data["actor"])
280     actor_uri = 'https://{}/actor'.format(request.host)
281
282     logging.debug(">> payload %r", data)
283
284     processor = processors.get(data['type'], None)
285     if processor:
286         await processor(actor, data, request)
287
288     return aiohttp.web.Response(body=b'{}', content_type='application/activity+json')
289
290
291 app.router.add_post('/inbox', inbox)