В этой статье мы подробно разберём Python-скрипт, который автоматически получает данные обменных курсов валют с сайта Центробанка РФ, преобразует их в удобный формат и загружает в базу данных PostgreSQL с использованием механизма UPSERT. Рассмотрим архитектуру решения, ключевые моменты реализации и нюансы работы с API, обработкой данных и соединением с базой.
Обзор функциональности
Скрипт состоит из двух основных функций:
— отвечает за получение данных обменных курсов по четырём валютам (USD, EUR, GBP, CNY) с использованием API Центробанка РФ, преобразование данных и формирование сводной таблицы с заполнением пропущенных дат.exchange_rates
— получает данные из функцииload_database
, устанавливает соединение с PostgreSQL и выполняет операцию UPSERT (вставка данных с обновлением существующих записей) в указанной таблице.exchange_rates
Кроме того, в скрипте реализовано логирование, которое помогает отслеживать выполнение и отладку процессов.
Аренда и настройка базы данных PostgreSQL для хранения данных о вакансиях
Этот этап довольно прост и понятен: я планирую арендовать базу данных PostgreSQL от провайдера Timeweb Cloud, который является самым бюджетным вариантом. Нам подойдет объем базы данных в 8 ГБ.
Затем переходим в административную панель нашей базы данных и создаем таблицу для хранения данных.
CREATE TABLE IF NOT EXISTS currency (
date DATE PRIMARY KEY,
usd NUMERIC(10, 4),
eur NUMERIC(10, 4),
gbp NUMERIC(10, 4),
cny NUMERIC(10, 4)
);
Получение данных обменных курсов
Теперь у нас есть все необходимое для запуска скрипта, но не забудьте указать данные вашей базы данных, чтобы запись в таблицу успешно произошла. Коды других валют вы можете найти на официальном сайте Центробанка РФ, перейдя по ссылке: https://www.cbr.ru/scripts/XML_val.asp, где представлен список валют с их идентификаторами для получения данных по динамике курсов. Чтобы добавить валюту, внесите её код в список currencies и добавьте соответствие в словарь currency_codes.
#!/usr/bin/env python3
import sys
import logging
import datetime
import psycopg2
import requests
import pandas as pd
from psycopg2.extras import execute_values
# Настройка логирования
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.propagate = False
logger.handlers = []
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(handler)
def exchange_rates(start_date: str, end_date: str, max_retries: int = 5) -> pd.DataFrame:
"""
Получает данные обменных курсов валют с сайта Центробанка РФ за указанный период.
Функция запрашивает данные для следующих валют:
- R01235: USD
- R01239: EUR
- R01270: GBP
- R01375: CNY
Для каждой валюты выполняется запрос к API Центробанка РФ по динамическому XML:
https://www.cbr.ru/scripts/XML_dynamic.asp.
Коды валют (например, R01235 для USD) берутся с ресурса:
https://www.cbr.ru/scripts/XML_val.asp
При успешном получении ответа данные преобразуются:
- Преобразование даты в тип datetime (с указанием формата dayfirst=True).
- Замена запятой на точку в значениях курсов и преобразование в float.
- Добавление идентификатора валюты (usd, eur, gbp, cny).
После получения данных для всех валют происходит их объединение в единый DataFrame.
Результирующий DataFrame преобразуется в сводную таблицу, где:
- Столбец 'date' представляет даты,
- Столбцы соответствуют валютам.
Недостающие даты заполняются предыдущими значениями (методом forward fill).
Parameters:
-----------
start_date : str
Начальная дата периода запроса (например, '01/01/2020').
end_date : str
Конечная дата периода запроса (например, '31/01/2020').
max_retries : int, optional
Максимальное количество попыток запроса для каждой валюты (по умолчанию 5).
Returns:
--------
pd.DataFrame
Сводная таблица с обменными курсами валют, где:
- Индекс 'date' представляет даты,
- Столбцы соответствуют валютам: 'usd', 'eur', 'gbp', 'cny'.
Пропущенные даты заполняются предыдущим значением.
"""
currencies = ['R01235', 'R01239', 'R01270', 'R01375']
currency_codes = {
'R01235': 'usd',
'R01239': 'eur',
'R01270': 'gbp',
'R01375': 'cny'
}
all_data = []
for currency in currencies:
retries = 0
while retries < max_retries:
try:
url = (
f"https://www.cbr.ru/scripts/XML_dynamic.asp?"
f"date_req1={start_date}&date_req2={end_date}&VAL_NM_RQ={currency}"
)
response = requests.get(url)
if response.status_code == 200:
# Читаем XML-данные
data = pd.read_xml(response.content, xpath="//Record")
data['Date'] = pd.to_datetime(data['Date'], dayfirst=True)
data['Value'] = data['Value'].str.replace(',', '.').astype(float)
data['Currency'] = currency_codes[currency]
all_data.append(data)
logger.info(f"Успешно получены данные для валюты: {currency_codes[currency]}")
break
else:
retries += 1
logger.warning(
f"Не удалось получить данные по валюте: {currency_codes[currency]}. "
f"Повторная попытка {retries}/{max_retries}. Код ошибки: {response.status_code}"
)
if retries == max_retries:
raise Exception(
f"Достигнуто максимальное количество повторных попыток для валюты: {currency_codes[currency]}"
)
except requests.exceptions.RequestException as e:
retries += 1
logger.error(
f"Ошибка при выборке данных для валюты: {currency_codes[currency]}: {e}. "
f"Повторная попытка {retries}/{max_retries}"
)
if retries == max_retries:
raise Exception(
f"Достигнуто максимальное количество повторных попыток для валюты: {currency_codes[currency]}"
)
# Объединяем данные в один DataFrame
combined_data = pd.concat(all_data)
data_pivot = combined_data.pivot(index='Date', columns='Currency', values='Value').reset_index()
data_pivot.columns.name = None
data_pivot = data_pivot.rename(columns={'Date': 'date'})
# Формирование полного диапазона дат и заполнение недостающих значений методом forward fill
full_date_range = pd.date_range(start=data_pivot['date'].min(), end=data_pivot['date'].max())
data_pivot = (
data_pivot
.set_index('date')
.reindex(full_date_range, method='ffill')
.rename_axis('date')
.reset_index()
)
return data_pivot
def load_database(db_user: str = "gen_user",
db_password: str = "",
db_host: str = "",
db_port: int = 5432,
db_name: str = "default_db") -> None:
"""
Загружает данные обменных курсов валют в базу данных через UPSERT с использованием psycopg2.
Данные загружаются по следующим шагам:
1. Определяется период с '01/01/2020' до текущей даты (часовой пояс Europe/Moscow).
2. Получаются данные обменных курсов с помощью функции exchange_rates.
3. Данные сортируются по дате (date).
4. Устанавливается соединение с базой данных.
5. Для таблицы prod_model.currency выполняется операция UPSERT с использованием
конструкции PostgreSQL "ON CONFLICT (date) DO UPDATE".
Parameters:
-----------
db_user : str
Имя пользователя базы данных.
db_password : str
Пароль пользователя базы данных.
db_host : str
Хост базы данных.
db_port : int, optional
Порт базы данных (по умолчанию 5432).
db_name : str, optional
Имя базы данных (по умолчанию 'default_db').
Returns:
--------
None
"""
start_date = '01/01/2020'
end_date = datetime.datetime.today().strftime('%d/%m/%Y')
# Получаем данные обменных курсов
df = exchange_rates(start_date, end_date).sort_values(by='date')
# Преобразование данных в список кортежей для вставки
records = list(df[['date', 'usd', 'eur', 'gbp', 'cny']].itertuples(index=False, name=None))
try:
conn = psycopg2.connect(
host=db_host,
port=db_port,
database=db_name,
user=db_user,
password=db_password
)
with conn:
with conn.cursor() as cur:
database_table = "currency"
upsert_query = f"""
INSERT INTO {database_table} (date, usd, eur, gbp, cny)
VALUES %s
ON CONFLICT (date) DO UPDATE SET
usd = EXCLUDED.usd,
eur = EXCLUDED.eur,
gbp = EXCLUDED.gbp,
cny = EXCLUDED.cny
"""
execute_values(cur, upsert_query, records)
logger.info(f"Данные успешно загружены в БД -> {database_table}")
except Exception as e:
logger.error(f"Ошибка при загрузке данных в базу: {e}")
raise
finally:
if conn:
conn.close()
if __name__ == '__main__':
load_database()