import os from flask import Flask, render_template, request, redirect, abort, url_for, session import requests from datetime import datetime def get_env_variable(var_name): value = os.environ.get(var_name) if not value: raise ValueError(f"Missing required environment variable: {var_name}") return value app = Flask(__name__) # Read environment variables outside the route function client_id = get_env_variable('CLIENT_ID') client_secret = get_env_variable('CLIENT_SECRET') redirect_uri = get_env_variable('REDIRECT_URI') optional_scopes = get_env_variable('OPTIONAL_SCOPES') database_url = get_env_variable('DATABASE_URL') secret_key = get_env_variable('APP_SECRET_KEY') # Set secret key to enable sessions app.secret_key = secret_key # https://www.inoreader.com/oauth2/auth AUTH_URL = 'https://github.com/login/oauth/authorize' @app.route('/') def home(): if is_logged_in(): token_id = session.get('token_id') resp = requests.get(f'{database_url}/token/{token_id}') raise_for_status(resp) resp_json = resp.json() token = resp_json['token'] user_info = requests.get('https://api.github.com/user', headers={ 'Authorization': f'Bearer {token.get("access_token")}' }).json() last_synced = datetime.fromtimestamp(token.get('updated_at')).strftime('%Y-%m-%d %H:%M:%S') next_sync = datetime.fromtimestamp(token.get('updated_at') + token.get('expiration_seconds')).strftime('%Y-%m-%d %H:%M:%S') return render_template('home.html', user_login=user_info.get('login'), user_email=user_info.get('email'), # for inoreader it's userName and userEmail readwise_api_key=token.get('readwise_api_key') or '', last_synced=last_synced, next_sync=next_sync) # Generate a CSRF protection string session['csrf_protection_string'] = os.urandom(16).hex() # Pass dynamic variables to the template return render_template('login.html', auth_url=AUTH_URL, client_id=client_id, redirect_uri=redirect_uri, optional_scopes=optional_scopes, csrf_protection_string=session.get('csrf_protection_string')) @app.route('/oauth-redirect') def oauth_redirect(): auth_code = request.args.get('code') csrf_token = request.args.get('state') # Verify the CSRF protection string if csrf_token != session.get('csrf_protection_string'): abort(403, 'Invalid CSRF token. Please try again.') # Exchange authorization code for access and refresh tokens # response = requests.post( # 'https://www.inoreader.com/oauth2/token', # headers={ # 'Content-Type': 'application/x-www-form-urlencoded', # }, # data={ # 'code': auth_code, # 'redirect_uri': redirect_uri, # 'client_id': client_id, # 'client_secret': client_secret, # 'scope': '', # 'grant_type': 'authorization_code' # } # ) # TEST: Github OAuth - REMOVE response = requests.post( 'https://github.com/login/oauth/access_token', headers={ 'Accept': 'application/json' }, data={ 'code': auth_code, 'redirect_uri': redirect_uri, 'client_id': client_id, 'client_secret': client_secret, } ) raise_for_status(response) token = response.json() # TEST: Github OAuth - REMOVE token['refresh_token'] = 'N/A' token['expires_in'] = 3600 # REPLACE user API call with inoreader API call # https://www.inoreader.com/reader/api/0/user-info user_info = requests.get('https://api.github.com/user', headers={ 'Authorization': f'Bearer {token.get("access_token")}' }).json() # Save tokens for later use token_id = save_token( user_info.get('email'), # for inoreader it's userEmail token.get('access_token'), token.get('refresh_token'), token.get('expires_in') ) set_session_token_id(token_id) return redirect(url_for('home')) # logout @app.route('/logout', methods=['POST']) def logout(): token_id = session.get('token_id') if not token_id: return redirect(url_for('home')) # remove token_id from session session.pop('token_id', None) # response = requests.put(f'{database_url}/token/{token_id}', headers={ # 'Content-Type': 'application/json' # }, json={ # 'is_logged_in': False # }) # response.raise_for_status() return redirect(url_for('home')) @app.route('/readwise', methods=['POST']) def submit_readwise_api(): token_id = session.get('token_id') if not token_id: return redirect(url_for('home')) response = requests.put(f'{database_url}/token/{token_id}', headers={ 'Content-Type': 'application/json' }, json={ 'readwise_api_key': request.form.get('readwise_api_key') }) raise_for_status(response) return redirect(url_for('home')) def is_logged_in(): token_id = session.get('token_id') if not token_id: return False response = requests.get(f'{database_url}/token/{token_id}') raise_for_status(response) resp_json = response.json() token = resp_json['token'] return token.get('active', False) def save_token(email, access_token, refresh_token, expiration_seconds): # check if an active token with this email already exists token_by_email_resp = requests.get(f'{database_url}/token?email={email}') raise_for_status(token_by_email_resp) if token_by_email_resp.status_code != 200: response = requests.post( f'{database_url}/token', headers={ 'Content-Type': 'application/json' }, json={ 'email': email, 'access_token': access_token, 'refresh_token': refresh_token, 'expiration_seconds': expiration_seconds } ) raise_for_status(response) return response.json().get('id') else: token_by_email_resp_json = token_by_email_resp.json() token = token_by_email_resp_json['token'] response = requests.put( f'{database_url}/token/{token["id"]}', headers={ 'Content-Type': 'application/json' }, json={ 'access_token': access_token, 'refresh_token': refresh_token, 'expiration_seconds': expiration_seconds, } ) raise_for_status(response) return token['id'] def set_session_token_id(token_id): session['token_id'] = token_id def raise_for_status(response): if response.status_code not in range(200, 300): msg = None try: msg = response.json().get('error', '') except: msg = response.text raise Exception(f'HTTPError: {response.status_code} \n Message: {msg}') if __name__ == '__main__': app.run(host='0.0.0.0', debug=True, port=5000)