Инструменты

Парсер курса валют ЦБ РФ: получение данных и загрузка в PostgreSQL

В этой статье мы подробно разберём Python-скрипт, который автоматически получает данные обменных курсов валют с сайта Центробанка РФ, преобразует их в удобный формат и загружает в базу данных PostgreSQL с использованием механизма UPSERT. Рассмотрим архитектуру решения, ключевые моменты реализации и нюансы работы с API, обработкой данных и соединением с базой.


Обзор функциональности

Скрипт состоит из двух основных функций:

  1. exchange_rates — отвечает за получение данных обменных курсов по четырём валютам (USD, EUR, GBP, CNY) с использованием API Центробанка РФ, преобразование данных и формирование сводной таблицы с заполнением пропущенных дат.
  2. load_database — получает данные из функции exchange_rates, устанавливает соединение с PostgreSQL и выполняет операцию UPSERT (вставка данных с обновлением существующих записей) в указанной таблице.

Кроме того, в скрипте реализовано логирование, которое помогает отслеживать выполнение и отладку процессов.

Аренда и настройка базы данных PostgreSQL для хранения данных о вакансиях

Этот этап довольно прост и понятен: я планирую арендовать базу данных PostgreSQL от провайдера Timeweb Cloud, который является самым бюджетным вариантом. Нам подойдет объем базы данных в 8 ГБ. 

Затем переходим в административную панель нашей базы данных и создаем таблицу для хранения данных.

SQL
CREATE TABLE IF NOT EXISTS currency (
    date DATE PRIMARY KEY,
    usd NUMERIC(10, 4),
    eur NUMERIC(10, 4),
    gbp NUMERIC(10, 4),
    cny NUMERIC(10, 4)
);

Получение данных обменных курсов

Теперь у нас есть все необходимое для запуска скрипта, но не забудьте указать данные вашей базы данных, чтобы запись в таблицу успешно произошла. Коды других валют вы можете найти на официальном сайте Центробанка РФ, перейдя по ссылке: https://www.cbr.ru/scripts/XML_val.asp, где представлен список валют с их идентификаторами для получения данных по динамике курсов. Чтобы добавить валюту, внесите её код в список currencies и добавьте соответствие в словарь currency_codes.

Python
#!/usr/bin/env python3
import sys
import logging
import datetime
import psycopg2
import requests
import pandas as pd
from psycopg2.extras import execute_values

# Настройка логирования
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.propagate = False
logger.handlers = []

handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(handler)



def exchange_rates(start_date: str, end_date: str, max_retries: int = 5) -> pd.DataFrame:
    """
    Получает данные обменных курсов валют с сайта Центробанка РФ за указанный период.

    Функция запрашивает данные для следующих валют:
      - R01235: USD
      - R01239: EUR
      - R01270: GBP
      - R01375: CNY

    Для каждой валюты выполняется запрос к API Центробанка РФ по динамическому XML:
      https://www.cbr.ru/scripts/XML_dynamic.asp.
    Коды валют (например, R01235 для USD) берутся с ресурса:
      https://www.cbr.ru/scripts/XML_val.asp

    При успешном получении ответа данные преобразуются:
      - Преобразование даты в тип datetime (с указанием формата dayfirst=True).
      - Замена запятой на точку в значениях курсов и преобразование в float.
      - Добавление идентификатора валюты (usd, eur, gbp, cny).

    После получения данных для всех валют происходит их объединение в единый DataFrame.
    Результирующий DataFrame преобразуется в сводную таблицу, где:
      - Столбец 'date' представляет даты,
      - Столбцы соответствуют валютам.
    Недостающие даты заполняются предыдущими значениями (методом forward fill).

    Parameters:
    -----------
    start_date : str
        Начальная дата периода запроса (например, '01/01/2020').
    end_date : str
        Конечная дата периода запроса (например, '31/01/2020').
    max_retries : int, optional
        Максимальное количество попыток запроса для каждой валюты (по умолчанию 5).

    Returns:
    --------
    pd.DataFrame
        Сводная таблица с обменными курсами валют, где:
          - Индекс 'date' представляет даты,
          - Столбцы соответствуют валютам: 'usd', 'eur', 'gbp', 'cny'.
        Пропущенные даты заполняются предыдущим значением.
    """
    currencies = ['R01235', 'R01239', 'R01270', 'R01375']
    currency_codes = {
        'R01235': 'usd',
        'R01239': 'eur',
        'R01270': 'gbp',
        'R01375': 'cny'
    }

    all_data = []

    for currency in currencies:
        retries = 0
        while retries < max_retries:
            try:
                url = (
                    f"https://www.cbr.ru/scripts/XML_dynamic.asp?"
                    f"date_req1={start_date}&date_req2={end_date}&VAL_NM_RQ={currency}"
                )
                response = requests.get(url)

                if response.status_code == 200:
                    # Читаем XML-данные
                    data = pd.read_xml(response.content, xpath="//Record")
                    data['Date'] = pd.to_datetime(data['Date'], dayfirst=True)
                    data['Value'] = data['Value'].str.replace(',', '.').astype(float)
                    data['Currency'] = currency_codes[currency]
                    all_data.append(data)
                    logger.info(f"Успешно получены данные для валюты: {currency_codes[currency]}")
                    break
                else:
                    retries += 1
                    logger.warning(
                        f"Не удалось получить данные по валюте: {currency_codes[currency]}. "
                        f"Повторная попытка {retries}/{max_retries}. Код ошибки: {response.status_code}"
                    )
                    if retries == max_retries:
                        raise Exception(
                            f"Достигнуто максимальное количество повторных попыток для валюты: {currency_codes[currency]}"
                        )
            except requests.exceptions.RequestException as e:
                retries += 1
                logger.error(
                    f"Ошибка при выборке данных для валюты: {currency_codes[currency]}: {e}. "
                    f"Повторная попытка {retries}/{max_retries}"
                )
                if retries == max_retries:
                    raise Exception(
                        f"Достигнуто максимальное количество повторных попыток для валюты: {currency_codes[currency]}"
                    )

    # Объединяем данные в один DataFrame
    combined_data = pd.concat(all_data)
    data_pivot = combined_data.pivot(index='Date', columns='Currency', values='Value').reset_index()
    data_pivot.columns.name = None
    data_pivot = data_pivot.rename(columns={'Date': 'date'})

    # Формирование полного диапазона дат и заполнение недостающих значений методом forward fill
    full_date_range = pd.date_range(start=data_pivot['date'].min(), end=data_pivot['date'].max())
    data_pivot = (
        data_pivot
        .set_index('date')
        .reindex(full_date_range, method='ffill')
        .rename_axis('date')
        .reset_index()
    )

    return data_pivot


def load_database(db_user: str = "gen_user",
                  db_password: str = "",
                  db_host: str = "",
                  db_port: int = 5432,
                  db_name: str = "default_db") -> None:
    """
    Загружает данные обменных курсов валют в базу данных через UPSERT с использованием psycopg2.

    Данные загружаются по следующим шагам:
      1. Определяется период с '01/01/2020' до текущей даты (часовой пояс Europe/Moscow).
      2. Получаются данные обменных курсов с помощью функции exchange_rates.
      3. Данные сортируются по дате (date).
      4. Устанавливается соединение с базой данных.
      5. Для таблицы prod_model.currency выполняется операция UPSERT с использованием
         конструкции PostgreSQL "ON CONFLICT (date) DO UPDATE".

    Parameters:
    -----------
    db_user : str
        Имя пользователя базы данных.
    db_password : str
        Пароль пользователя базы данных.
    db_host : str
        Хост базы данных.
    db_port : int, optional
        Порт базы данных (по умолчанию 5432).
    db_name : str, optional
        Имя базы данных (по умолчанию 'default_db').

    Returns:
    --------
    None
    """
    start_date = '01/01/2020'
    end_date = datetime.datetime.today().strftime('%d/%m/%Y')
    # Получаем данные обменных курсов
    df = exchange_rates(start_date, end_date).sort_values(by='date')

    # Преобразование данных в список кортежей для вставки
    records = list(df[['date', 'usd', 'eur', 'gbp', 'cny']].itertuples(index=False, name=None))

    try:
        conn = psycopg2.connect(
            host=db_host,
            port=db_port,
            database=db_name,
            user=db_user,
            password=db_password
        )
        with conn:
            with conn.cursor() as cur:
                database_table = "currency"
                upsert_query = f"""
                    INSERT INTO {database_table} (date, usd, eur, gbp, cny)
                    VALUES %s
                    ON CONFLICT (date) DO UPDATE SET
                        usd = EXCLUDED.usd,
                        eur = EXCLUDED.eur,
                        gbp = EXCLUDED.gbp,
                        cny = EXCLUDED.cny
                """
                execute_values(cur, upsert_query, records)
                logger.info(f"Данные успешно загружены в БД -> {database_table}")
    except Exception as e:
        logger.error(f"Ошибка при загрузке данных в базу: {e}")
        raise
    finally:
        if conn:
            conn.close()


if __name__ == '__main__':
    load_database()
Пред.
Введение в анализ временных рядов

Введение в анализ временных рядов

Содержание Показать Временной ряд?

След.
Математический аппарат Facebook Prophet

Математический аппарат Facebook Prophet

Содержание Показать 1