inoreader2readwise/main.py

107 lines
3.1 KiB
Python
Raw Normal View History

2024-01-17 10:14:23 +00:00
import os
import time
import json
import requests
import logging
2024-01-17 10:14:23 +00:00
DATA_STORE_PATH = "/data/last_update_time.txt"
logging.basicConfig(level=logging.INFO)
class APIHandler:
2024-01-17 11:25:43 +00:00
def __init__(self, base_url, headers={}):
self.base_url = base_url
2024-01-17 11:25:43 +00:00
self.headers = headers
def get(self, endpoint, params=None):
2024-01-17 11:25:43 +00:00
response = requests.get(self.base_url + endpoint, params=params, headers=self.headers)
response.raise_for_status()
return response.json()
def post(self, endpoint, data=None):
2024-01-17 11:25:43 +00:00
response = requests.post(self.base_url + endpoint, data=json.dumps(data), headers=self.headers)
response.raise_for_status()
return response.status_code
2024-01-17 10:14:23 +00:00
def get_last_update_time():
with open(DATA_STORE_PATH, 'r') as file:
return file.read().strip()
def update_last_update_time(new_time):
with open(DATA_STORE_PATH, 'w') as file:
file.write(new_time)
2024-01-17 12:01:17 +00:00
def get_new_annotations(last_annotation_time):
inoreader = APIHandler(
"https://www.inoreader.com/reader/api/0/stream/contents",
2024-01-17 11:25:43 +00:00
headers = {
'Authorization': 'Bearer ' + os.getenv("INOREADER_ACCESS_TOKEN")
2024-01-17 12:01:17 +00:00
}
)
inoreader_response = inoreader.get(
"/user/-/state/com.google/annotated",
params={
"annotations": 1,
"n": 100,
}
)
data = json.loads(inoreader_response)
all_annotations = []
for item in data["items"]:
annotations = item.get("annotations", [])
for annotation in annotations:
annotation['title'] = item['title']
annotation['author'] = item['author']
all_annotations.append(annotation)
return [annotation for annotation in all_annotations if annotation['added_on'] > last_annotation_time]
2024-01-17 11:25:43 +00:00
2024-01-17 12:01:17 +00:00
def push_annotations_to_readwise(annotations):
readwise = APIHandler(
"https://readwise.io",
2024-01-17 11:25:43 +00:00
headers = {
'Authorization': 'Token ' + os.getenv("READWISE_ACCESS_TOKEN"),
'Content-Type': 'application/json'
}
)
2024-01-17 10:14:23 +00:00
2024-01-17 12:01:17 +00:00
readwise.post(
"/api/v2/highlights/",
data={
'highlights': [
{
'text': annotation['text'],
'title': annotation['title'],
'author': annotation['author'],
'note': annotation['note'],
}
for annotation in annotations
]
}
)
def main():
2024-01-17 10:14:23 +00:00
while True:
try:
last_annotation_time = get_last_update_time()
2024-01-17 12:01:17 +00:00
new_annotations = get_new_annotations(last_annotation_time)
if new_annotations:
latest_added_on = max(annotation['added_on'] for annotation in new_annotations)
2024-01-17 12:01:17 +00:00
push_annotations_to_readwise(new_annotations)
update_last_update_time(latest_added_on)
else:
logging.info("No new annotations found")
2024-01-17 10:14:23 +00:00
time.sleep(3600)
2024-01-17 10:14:23 +00:00
except Exception as e:
logging.error(f"An error occurred: {e}")
2024-01-17 12:01:17 +00:00
time.sleep(60)
2024-01-17 10:14:23 +00:00
if __name__ == "__main__":
2024-01-17 12:01:17 +00:00
main()