#!/usr/bin/python3 import datetime import os import requests import sys import asyncio from nio import AsyncClient, RoomMessageNotice healthcheck_url = "https://hc-ping.com/" + os.environ['MATRIX_HC_UID'] def send_ping(success, msg=""): url = healthcheck_url if not success: url += "/fail" requests.get(url, data=msg, headers={'user-agent': os.environ['USER_AGENT']}) async def main(): try: client = AsyncClient(os.environ['MATRIX_SERVER']) client.access_token = os.environ['MATRIX_TOKEN'] client.device_id = os.environ['USER_AGENT'] await client.room_send( room_id = os.environ['MATRIX_ROOM'], message_type = "m.room.message", content = { "msgtype": "m.text", "body": "!ping" } ) except Exception as e: print(e) print("exception during login or sending") send_ping(False) sys.exit(1) await client.close() url = "https://federationtester.matrix.org/api/report?server_name=" \ + os.environ['MATRIX_SERVER_FEDTESTER'] resp = requests.get(url) data = resp.json() # Check the JSON Response Content documentation below if data["FederationOK"] != True: send_ping(False) sys.exit(1) requests.get(url=healthcheck_url) send_ping(True) sys.exit(0) asyncio.new_event_loop().run_until_complete(main())