Swapnil
5a5f737ff4
- fix get_last_update_time (int) - use continuation query - sleep 15 min b/w pages - sleep 24 hrs b/w each cycle - sleep 1 hr if error - Push extra highlights properties
123 lines
3.7 KiB
Python
123 lines
3.7 KiB
Python
import os
|
|
import time
|
|
import json
|
|
import requests
|
|
import logging
|
|
|
|
DATA_STORE_PATH = "/data/last_update_time.txt"
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
class APIHandler:
|
|
def __init__(self, base_url, headers={}):
|
|
self.base_url = base_url
|
|
self.headers = headers
|
|
|
|
def get(self, endpoint, params=None):
|
|
response = requests.get(self.base_url + endpoint, params=params, headers=self.headers)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def post(self, endpoint, data=None):
|
|
response = requests.post(self.base_url + endpoint, data=json.dumps(data), headers=self.headers)
|
|
response.raise_for_status()
|
|
return response.status_code
|
|
|
|
def get_last_update_time():
|
|
with open(DATA_STORE_PATH, 'r') as file:
|
|
return int(file.read().strip())
|
|
|
|
def update_last_update_time(new_time):
|
|
with open(DATA_STORE_PATH, 'w') as file:
|
|
file.write(str(new_time))
|
|
|
|
def get_new_annotations(last_annotation_time):
|
|
inoreader = APIHandler(
|
|
"https://www.inoreader.com/reader/api/0/stream/contents",
|
|
headers = {
|
|
'Authorization': 'Bearer ' + os.getenv("INOREADER_ACCESS_TOKEN")
|
|
}
|
|
)
|
|
|
|
all_annotations = []
|
|
continuation = None
|
|
|
|
while True:
|
|
params = {
|
|
"annotations": 1,
|
|
"n": 100,
|
|
}
|
|
if continuation:
|
|
params["c"] = continuation
|
|
|
|
inoreader_response = inoreader.get(
|
|
"/user/-/state/com.google/annotated",
|
|
params=params
|
|
)
|
|
data = json.loads(inoreader_response)
|
|
|
|
for item in data["items"]:
|
|
annotations = item.get("annotations", [])
|
|
for annotation in annotations:
|
|
annotation['title'] = item['title']
|
|
annotation['author'] = item['author']
|
|
annotation['sources'] = item['canonical']
|
|
all_annotations.append(annotation)
|
|
|
|
if 'continuation' in data:
|
|
continuation = data['continuation']
|
|
time.sleep(900) # Sleep for 15 minutes between pages
|
|
else:
|
|
break
|
|
|
|
return [annotation for annotation in all_annotations if annotation['added_on'] > last_annotation_time]
|
|
|
|
def push_annotations_to_readwise(annotations):
|
|
readwise = APIHandler(
|
|
"https://readwise.io",
|
|
headers = {
|
|
'Authorization': 'Token ' + os.getenv("READWISE_ACCESS_TOKEN"),
|
|
'Content-Type': 'application/json'
|
|
}
|
|
)
|
|
|
|
readwise.post(
|
|
"/api/v2/highlights/",
|
|
data={
|
|
'highlights': [
|
|
{
|
|
'text': annotation['text'],
|
|
'title': annotation['title'],
|
|
'author': annotation['author'],
|
|
'note': annotation['note'],
|
|
'highlighted_at': annotation['added_on'],
|
|
'category': 'articles',
|
|
'source_url': annotation['sources'][0]['href'] if annotation['sources'] else None,
|
|
}
|
|
for annotation in annotations
|
|
]
|
|
}
|
|
)
|
|
|
|
def main():
|
|
|
|
while True:
|
|
try:
|
|
last_annotation_time = get_last_update_time()
|
|
new_annotations = get_new_annotations(last_annotation_time)
|
|
|
|
if new_annotations:
|
|
latest_added_on = max(annotation['added_on'] for annotation in new_annotations)
|
|
push_annotations_to_readwise(new_annotations)
|
|
update_last_update_time(latest_added_on)
|
|
else:
|
|
logging.info("No new annotations found")
|
|
|
|
time.sleep(86400) # Sleep for 24 hours
|
|
|
|
except Exception as e:
|
|
logging.error(f"An error occurred: {e}")
|
|
time.sleep(3600) # Sleep for 1 hour in case of error
|
|
|
|
if __name__ == "__main__":
|
|
main() |