services/healthcheck/data/matrix

56 lines
1.4 KiB
Python
Executable file

#!/usr/bin/python3
import datetime
import os
import requests
import sys
import asyncio
from nio import AsyncClient, RoomMessageNotice
healthcheck_url = "https://hc-ping.com/" + os.environ['MATRIX_HC_UID']
def send_ping(success, msg=""):
url = healthcheck_url
if not success:
url += "/fail"
requests.get(url, data=msg, headers={'user-agent': os.environ['USER_AGENT']})
async def main():
try:
client = AsyncClient(os.environ['MATRIX_SERVER'])
client.access_token = os.environ['MATRIX_TOKEN']
client.device_id = os.environ['USER_AGENT']
await client.room_send(
room_id = os.environ['MATRIX_ROOM'],
message_type = "m.room.message",
content = {
"msgtype": "m.text",
"body": "!ping"
}
)
except Exception as e:
print(e)
print("exception during login or sending")
send_ping(False)
sys.exit(1)
await client.close()
url = "https://federationtester.matrix.org/api/report?server_name=" \
+ os.environ['MATRIX_SERVER_FEDTESTER']
resp = requests.get(url)
data = resp.json() # Check the JSON Response Content documentation below
if data["FederationOK"] != True:
send_ping(False)
sys.exit(1)
requests.get(url=healthcheck_url)
send_ping(True)
sys.exit(0)
asyncio.new_event_loop().run_until_complete(main())