import urllib import urllib.parse import requests import sqlite3 import json # The text file with the travel destinations, one per line data_source_file = 'titustravel.data' # The common parts of the geoapify url we'll need to query the API about each place # The missing part is the place name, which will need to be URL encoded before being # sandwiched between these two strings to form the full URL service_url = 'https://api.geoapify.com/v1/geocode/search?text=38%20' api_key = '&apiKey=YOUR_API_KEY_HERE' # The name of the database we're going to create database_file_name = "traveldata.db" # Global variables database_connection : sqlite3.Connection = None database_cursor : sqlite3.Cursor = None # SQL constants TABLE_NAME = "destinations" COLUMN_PLACE = "place" COLUMN_DATA = "data" # TOP LEVEL FUNCTIONS def process_travel_destinations_input_file(file): for line in file: line = line.strip() if (line != ""): process_travel_destination(line) def process_travel_destination(travelDestination : str): print("=====================================") print("Processing travel destination:", travelDestination) url = convert_travel_destination_to_url(travelDestination) dataForDestination : str = query_api_for_travel_destination_data(url) store_data_in_database(travelDestination, dataForDestination) # API CALL FUNCTIONS def query_api_for_travel_destination_data(url : str): try: response = requests.get(url) except Exception as e: print('Error quering the geo API:', e) if response.status_code == 200: print("GeoAPI request successful! Data for destination received.") data = response.json() return data else: print(f"API request failed with status code {response.status_code}") quit() def convert_travel_destination_to_url(travelDestination : str): url = f'{service_url}{url_encode_travel_destination(travelDestination)}{api_key}' return url def url_encode_travel_destination(travelDestination : str): try: encodedURL = urllib.parse.quote(travelDestination) except Exception as e: deal_with_error(f"Error encoding travel destination as a URL: {travelDestination}", e) return encodedURL # DATABASE FUNCTIONS def store_data_in_database(place : str, data : dict): check_database_connection_is_valid() if is_destination_already_in_database(place): return sql_command = get_sql_command_to_insert_destination(place, data) print(f"Storing retrieved JSON data for {place} in database.") try: database_cursor.execute(sql_command) except Exception as e: deal_with_error(f"Error executing SQL command: {sql_command}", e) commit_database_changes() def is_destination_already_in_database(place): sql_command = get_sql_command_to_test_if_destination_already_inserted(place) try: database_cursor.execute(sql_command) result = database_cursor.fetchone() if result: print(place, "is already in database; not adding.") return True except Exception as e: deal_with_error(f"Error executing SQL command: {sql_command}", e) return False def connect_to_database(): # This will create an empty database file if it doesn't already exist try: global database_connection global database_cursor database_connection = sqlite3.connect(database_file_name) database_cursor = database_connection.cursor() except Exception as e: deal_with_error(f"Error connecting to database: {database_file_name}", e) def create_destinations_table_in_database_if_it_does_not_exist(): check_database_connection_is_valid() sql_command = f'CREATE TABLE IF NOT EXISTS {TABLE_NAME} ({COLUMN_PLACE} TEXT, {COLUMN_DATA} TEXT)' try: database_cursor.execute(sql_command) except Exception as e: deal_with_error(f"Error executing SQL command: {sql_command}", e) def check_database_connection_is_valid(): if is_database_connection_valid() == False: connect_to_database() if is_database_connection_valid() == False: deal_with_error(f"Database connection failed to initialise!", None) def is_database_connection_valid(): if database_connection == None or database_cursor == None: return False else: return True def get_sql_command_to_test_if_destination_already_inserted(place): sql_command = f"SELECT * FROM {TABLE_NAME} WHERE {COLUMN_PLACE} = '{place}'" return sql_command def get_sql_command_to_insert_destination(place : str, data : dict): escaped_data_str = convert_dictionary_data_to_escaped_string(data) sql_command = f"INSERT INTO {TABLE_NAME} ({COLUMN_PLACE}, {COLUMN_DATA}) VALUES ('{place}', '{escaped_data_str}')" return sql_command def convert_dictionary_data_to_escaped_string(data: dict): try: data_str = json.dumps(data) except Exception as e: deal_with_error(f"Data is in unexpected format: {data}", e) escaped_data_str = data_str.replace("'", "''") # escape single quotes return escaped_data_str def commit_database_changes(): check_database_connection_is_valid() database_connection.commit() # ERROR HANDLING def deal_with_error(description, e): print(description) if e is not None: print(f'Exception was:', e) print('Aborting.') if database_connection is not None: database_connection.close() quit() # MAIN connect_to_database() create_destinations_table_in_database_if_it_does_not_exist() try: # File is closed automatically, even if there's an exception with open(data_source_file, 'r') as file: process_travel_destinations_input_file(file) print('Finished') except FileNotFoundError: print('Find not found:', data_source_file) except Exception as e: print('Unexpected error:', e) if database_connection is not None: database_connection.close()