import os from flask import Flask, render_template, request, redirect, abort, url_for, session import requests from datetime import datetime from urllib.parse import urlencode def get_env_variable(var_name): value = os.environ.get(var_name) if not value: raise ValueError(f"Missing required environment variable: {var_name}") return value app = Flask(__name__) # Read environment variables outside the route function client_id = get_env_variable('CLIENT_ID') client_secret = get_env_variable('CLIENT_SECRET') redirect_uri = get_env_variable('REDIRECT_URI') optional_scopes = get_env_variable('OPTIONAL_SCOPES') database_url = get_env_variable('DATABASE_URL') secret_key = get_env_variable('APP_SECRET_KEY') # Set secret key to enable sessions app.secret_key = secret_key # https://www.inoreader.com/oauth2/auth # Corrected URL for Inoreader OAuth AUTH_URL = 'https://www.inoreader.com/oauth2/auth' #defining constant TOKEN_URL = 'https://www.inoreader.com/oauth2/token' USER_INFO_URL = 'https://www.inoreader.com/reader/api/0/user-info' @app.route('/') def home(): if is_logged_in(): return main_menu() else: return generate_login_page() def main_menu(): token_id = session.get('token_id') token = get_token_from_database(token_id) user_info = get_user_info(token['access_token']) last_synced, next_sync = format_sync_times(token) return render_template('home.html', user_login=user_info.get('userName'), user_email=user_info.get('userEmail'), readwise_api_key=token.get('readwise_api_key', ''), last_synced=last_synced, next_sync=next_sync) def generate_login_page(): session['csrf_protection_string'] = os.urandom(16).hex() oauth_params = { 'client_id': client_id, 'redirect_uri': redirect_uri, 'response_type': 'code', 'scope': optional_scopes, 'state': session['csrf_protection_string'] } oauth_url = f'{AUTH_URL}?{urlencode(oauth_params)}' return render_template('login.html', oauth_url=oauth_url) @app.route('/oauth-redirect') def oauth_redirect(): auth_code = request.args.get('code') csrf_token = request.args.get('state') # Verify the CSRF protection string if csrf_token != session.get('csrf_protection_string'): abort(403, 'Invalid CSRF token. Please try again.') # Exchange authorization code for access and refresh tokens using the Inoreader API response = requests.post( TOKEN_URL, headers={'Accept': 'application/json'}, data={ 'code': auth_code, 'redirect_uri': redirect_uri, 'client_id': client_id, 'client_secret': client_secret, } ) raise_for_status(response) token = response.json() # Fetch user information from Inoreader user_info = requests.get(USER_INFO_URL, headers={ 'Authorization': f'Bearer {token.get("access_token")}' }).json() # Save tokens for later use token_id = save_or_update_token(user_info.get('userEmail'), token) set_session_token_id(token_id) return redirect(url_for('home')) # logout @app.route('/logout', methods=['POST']) def logout(): token_id = session.get('token_id') if not token_id: return redirect(url_for('home')) # remove token_id from session session.pop('token_id', None) # response = requests.put(f'{database_url}/token/{token_id}', headers={ # 'Content-Type': 'application/json' # }, json={ # 'is_logged_in': False # }) # response.raise_for_status() return redirect(url_for('home')) @app.route('/readwise', methods=['POST']) def submit_readwise_api(): token_id = session.get('token_id') if not token_id: return redirect(url_for('home')) response = requests.put(f'{database_url}/token/{token_id}', headers={ 'Content-Type': 'application/json' }, json={ 'readwise_api_key': request.form.get('readwise_api_key') }) raise_for_status(response) return redirect(url_for('home')) def is_logged_in(): token_id = session.get('token_id') if not token_id: return False response = requests.get(f'{database_url}/token/{token_id}') raise_for_status(response) resp_json = response.json() token = resp_json['token'] return token.get('active', False) def save_or_update_token(email, access_token, refresh_token, expiration_seconds): response = requests.get(f'{database_url}/token?email={email}') raise_for_status(response) if response.status_code == 200: update_login(response.json()['token']['id'], access_token, refresh_token, expiration_seconds) else: add_login(email, access_token, refresh_token, expiration_seconds) def add_login(email, access_token, refresh_token, expiration_seconds): response = requests.post( f'{database_url}/token', headers={'Content-Type': 'application/json'}, json={ 'email': email, 'access_token': access_token, 'refresh_token': refresh_token, 'expiration_seconds': expiration_seconds } ) raise_for_status(response) return response.json().get('id') def update_login(token_id, access_token, refresh_token, expiration_seconds): response = requests.put( f'{database_url}/token/{token_id}', headers={'Content-Type': 'application/json'}, json={ 'access_token': access_token, 'refresh_token': refresh_token, 'expiration_seconds': expiration_seconds } ) raise_for_status(response) return token_id def set_session_token_id(token_id): session['token_id'] = token_id def raise_for_status(response): if response.status_code not in range(200, 300): try: msg = response.json().get('error', 'No error message provided') except Exception: msg = response.text raise Exception(f'HTTPError: {response.status_code} - Message: {msg}') if __name__ == '__main__': app.run(host='0.0.0.0', debug=True, port=5000)