import requests
from pyspark.sql import Row
import json
from datetime import datetime
def after_all(df, spark, app, *args, **kwargs):
# variables for telegram api
# input yours
bot_token = '64:AAEet'
chat_ids = ["@awcommunity", "@awbi_ru"]
def get_chat_name(chat_id):
try:
path = f'https://api.telegram.org/bot{bot_token}/getChat?chat_id={chat_id}'
response = requests.get(path)
if not response.ok:
raise Exception(f'\nОшибка URL: {response.url} \/n {response.status_code}: {response.text}')
return response.json()
except Exception as e:
print(e)
def get_chat_member_count(chat_id):
try:
path = f'https://api.telegram.org/bot{bot_token}/getChatMemberCount?chat_id={chat_id}'
response = requests.get(path)
if not response.ok:
raise Exception(f'\nОшибка URL: {response.url} \/n {response.status_code}: {response.text}')
return response.json()
except Exception as e:
print(e)
def get_community_stat(chat_id):
res = get_chat_member_count(chat_id)
member_count = None
if res:
member_count = res["result"]
now = datetime.now()
return [now, member_count]
def get_telegram_chat_title(chat_id):
res = get_chat_name(chat_id)
telegram_chat_title = None
if res:
telegram_chat_title = res["result"]["title"]
return telegram_chat_title
result_rows = []
for chat_id in chat_ids:
[current_date, members_count] = get_community_stat(chat_id)
telegram_chat_title = get_telegram_chat_title(chat_id)
result_row = [current_date, members_count, telegram_chat_title]
result_rows.append(result_row)
print(result_rows)
df = df.union(spark.createDataFrame(result_rows))
return df