Внедрение Elixir в экосистему Solana и почему вы можете захотеть создать свой следующий программный клиент в Elixir

Дерек Мир

Я рад объявить о первых выпусках двух новых пакетов Elixir: solana, неофициальный клиент Solana API, и solana_spl, неофициальный интерфейс Solana Program Library.

Почему?

До того, как я начал работать в венчурной студии DeFi, я работал над веб-приложениями, написанными на Эликсире. Elixir — отличный выбор для создания современных отказоустойчивых параллельных приложений, таких как приложения Web2 или конвейеры данных. Я отметил несколько случаев в работе, которые могли бы выиграть от использования библиотеки Elixir или языковой функции. Недавно, проводя исследование для нового проекта, я нашел один случай, который настолько хорошо соответствует сильным сторонам Elixir, что решил самостоятельно запустить экосистему Elixir-Solana.

Примечание. остальная часть этой статьи предполагает, что у вас есть некоторое представление об Эликсире. Если вы хотите узнать больше, официальный сайт и документация — хорошие места для начала.

Проблема

Соберите все транзакции, связанные с токенами, и запустите на них proprietary operations.

Solana предоставляет несколько методов API для этого: getSignaturesForAccount, который возвращает список подписей транзакций, и getTransaction, который возвращает детали транзакции с учетом ее подписи. Вам нужно вызвать getSignaturesForAccount для монетного двора токена, который вы хотите проанализировать, затем для каждого элемента в ответе вы вызываете getTransaction, чтобы получить подробности.

Некоторые дополнительные ограничения:

  • большинство официальных конечных точек имеют ограничение в 100 запросов каждые 10 секунд на IP.
  • getSignaturesForAccount возвращает максимум 1000 подписей; большинство крупных дистрибутивов токенов имеют на порядок больше транзакций, чем это.

Я не знаю, как лучше всего это сделать в JavaScript, поэтому давайте посмотрим, как я решу эту проблему в Elixir.

Код

Первое ограничение, которое я перечислил выше, — это ограничение скорости. При применении ограничений скорости в конвейерах данных, подобных этому, разработчики Elixir обращаются к пакету GenStage. Это настолько распространенный вариант использования, что официальный репозиторий предоставляет пример ограничения скорости.

Я немного изменил этот код, чтобы он соответствовал моим потребностям:

elixir
defmodule SolanaExample.Client.Producer do
  @moduledoc """
 This module accepts new API requests and forwards them to the
  `SolanaExample.RateLimiter` for eventual execution.
 """
  use GenStage

  # API
  def start_link(config) do
    GenStage.start_link(__MODULE__, :ok, name: SolanaExample.Registry.via(name(config)))
  end

 # This process' name, so that other processes can find it
  def name(config) do
    Keyword.get(config, :network) <> "_producer"
  end

  def cast(producer, requests) do
    GenStage.cast(producer, {:send, requests, self()})
  end

  def call(producer, requests), do: GenStage.call(producer, {:send, requests})

  # GenStage callbacks
  def init(:ok) do
    {:producer, %{demand: 0, queue: []}}
  end

  def handle_cast({:send, requests, from}, state) do
    handle_call({:send, requests}, from, state)
  end

  def handle_call({:send, requests}, from, state) do
    {events, queue, demand} =
      state.queue
      |> add_to_queue(from, requests)
      |> fulfill_demand(state.demand)

    {:noreply, expand(events), %{queue: queue, demand: demand}}
  end

  def handle_demand(demand, state) do
    {events, queue, demand} =
      state.queue
      |> fulfill_demand(demand)

    {:noreply, expand(events), %{queue: queue, demand: demand}}
  end

  defp add_to_queue(queue, from, requests) do
    List.flatten([queue | [{from, List.wrap(requests)}]])
  end

  defp expand(events) do
    events
    |> Enum.map(fn {from, requests} -> Enum.map(requests, &{from, &1}) end)
    |> List.flatten()
  end

  defp fulfill_demand(queue, demand) do
    Enum.reduce_while(queue, {[], queue, demand}, fn
      {from, requests}, {to_emit, [_ | rest], demand} when length(requests) <= demand ->
        {:cont, {[{from, requests} | to_emit], rest, demand - length(requests)}}

      _, acc ->
        {:halt, acc}
    end)
  end
end

defmodule SolanaExample.Client.RateLimiter do
  @moduledoc """
  This module rate limits the API requests from the Producer to 100 every 10
  seconds.
  """
  use GenStage

  alias SolanaExample.{Registry, Client}

  # API
  def start_link(config) do
    GenStage.start_link(__MODULE__, config, name: Registry.via(name(config)))
  end

 # the registry's name for this process so other processes can find it
  def name(config) do
    Keyword.get(config, :network) <> "_ratelimiter"
  end

  # GenStage callbacks
  def init(config) do
    subscription = Registry.lookup(Client.Producer.name(config))
    {:producer_consumer, %{}, subscribe_to: [subscription]}
  end

  def handle_subscribe(:producer, opts, from, producers) do
    limit = Keyword.get(opts, :max_demand, 100)
    interval = Keyword.get(opts, :interval, 10_000)

    producers =
      producers
      |> Map.put(from, {limit, interval})
      |> ask_and_schedule(from)

    {:manual, producers}
  end

  def handle_subscribe(:consumer, _opts, _from, consumers) do
    {:automatic, consumers}
  end

  def handle_cancel(_, from, producers) do
    {:noreply, [], producers |> Map.delete(from)}
  end

  def handle_events(events, _from, producers) do
    {:noreply, events, producers}
  end

  def handle_info({:ask, from}, producers) do
    {:noreply, [], ask_and_schedule(producers, from)}
  end

  defp ask_and_schedule(producers, from) do
    case producers do
      %{^from => {limit, interval}} ->
        GenStage.ask(from, limit)
        Process.send_after(self(), {:ask, from}, interval)
        producers

      %{} ->
        producers
    end
  end
end

defmodule SolanaExample.Client.Consumer do
  @moduledoc """
  This module actually executes the rate-limited API requests.
  """
  use GenStage

  alias Solana.RPC
  alias SolanaExample.Client

  # API
  def start_link(config), do: GenStage.start_link(__MODULE__, config)

  # GenStage callbacks
  def init(config) do
    subscription = SolanaExample.Registry.lookup(Client.RateLimiter.name(config))
    adapter = {Tesla.Adapter.Gun, [certificates_verification: true]}
    opts = Keyword.merge(config, adapter: adapter)
    state = %{network: Keyword.get(config, :network), client: RPC.client(opts)}
    {:consumer, state, subscribe_to: [subscription]}
  end

  def handle_events(events, _from, state = %{client: client}) do
    events
    |> Enum.reduce(%{}, fn {from, request}, batches ->
      Map.update(batches, from, [request], &[request | &1])
    end)
    |> Enum.each(fn
      {from, requests} when is_pid(from) ->
     requests = Enum.reverse(requests)
        response = Enum.zip(RPC.Request.encode(requests), RPC.send(client, requests))
        send(from, {:rpc, state.network, response})

      {from, requests} ->
        GenStage.reply(from, RPC.send(client, Enum.reverse(requests)))
    end)

    {:noreply, [], state}
  end
end

Я не буду подробно рассматривать этот код, но его основная задача — принимать новые запросы API, ограничивать их скорость и отправлять их на конечную точку Solana RPC. На работе проекты постоянно сталкивались с ограничениями API, как только они попадали в рабочую среду; этот код предотвратит эту проблему.

Теперь все, что нам нужно, — это способ запустить все эти процессы и запустить наш первоначальный запрос.

Чтобы запустить все эти процессы со встроенным аварийное восстановление, вы можете контролировать их:

elixir

defmodule SolanaExample.Client do
  @moduledoc """
  Responsible for supervising the GenStage pipeline for a specific Solana
  cluster.
  """
  use Supervisor

  alias SolanaExample.{Client, Registry}

  def start_link(config) do
    Supervisor.start_link(__MODULE__, config, name: name(config))
  end

  def init(config) do
    children = [
      {Client.Producer, config},
      {Client.RateLimiter, config},
      {Client.Consumer, config}
    ]

    Supervisor.init(children, strategy: :rest_for_one)
  end

  def cast(name, requests) when is_binary(name) do
    lookup_and_run(name, requests, &cast/2)
  end

  def cast(client, requests) when is_pid(client) do
    find_child_and_run(client, requests, &Client.Producer.cast/2)
  end

  def call(name, requests) when is_binary(name) do
    lookup_and_run(name, requests, &call/2)
  end

  def call(client, requests) when is_pid(client) do
    find_child_and_run(client, requests, &Client.Producer.call/2)
  end

  defp lookup_and_run(name, requests, fun) do
    case Registry.lookup(name) do
      nil -> :error
      client -> fun.(client, requests)
    end
  end

  defp find_child_and_run(client, requests, fun) do
    client
    |> Supervisor.which_children()
    |> Enum.find(&(elem(&1, 0) == Client.Producer))
    |> case do
      nil -> :error
      {_, producer, _, _} -> fun.(producer, requests)
    end
  end

  defp name(config), do: Registry.via(Keyword.fetch!(config, :network))
end

Если вы хотите отслеживать несколько кластеров, то есть devnet против mainnet-beta,
вам, вероятно, следует иметь разные Client для каждого кластера, который вы хотите
отслеживать. Вы можете сделать это с помощью DynamicSupervisor:

elixir
defmodule SolanaExample.Client.Supervisor do
  @moduledoc """
  Responsible for managing `SolanaExample.Client` processes.
  """
  use DynamicSupervisor

  @me __MODULE__

  def start_link(_), do: DynamicSupervisor.start_link(@me, :ok, name: @me)

  @doc """
  retrieves or kicks off a `SolanaExample.Client` process based on the desired
  cluster.
  """
  def get_or_start_client(config) do
    case start_child(config) do
      {:error, {:already_started, pid}} -> {:ok, pid}
      other -> other
    end
  end

  def start_child(config) do
    DynamicSupervisor.start_child(@me, {SolanaExample.Client, config})
  end

  def init(_), do: DynamicSupervisor.init(strategy: :one_for_one)
end

Наконец, вот способ запустить асинхронный запрос:

elixir
defmodule SolanaExample.Runner do
  @moduledoc """
  Runs various asynchronous queries associated with collecting address
  transactions.
  """
  use GenServer

  @me __MODULE__
  @tx_limit 25

  alias Solana.RPC
  alias SolanaExample.Client

  def start_link(_), do: GenServer.start_link(@me, :ok, name: @me)

  @doc """
  Gets account information for an address, and kicks off an asynchronous
  workstream to retrieve every new transaction associated with this account.
  This runs in the background until there are no more transactions to process.
  """
  def sync(address, network) do
    with {:ok, decoded} <- B58.decode58(address),
         request = RPC.Request.get_account_info(decoded, encoding: "jsonParsed"),
         nil <- get_account_from_database(address, network),
         [{:ok, data}] <- Client.call(network, request) do
      add_account_to_database(address, network, %{data: data, updating?: true, last: nil})
      # 
      GenServer.cast(@me, {:sync, address, network})
      {:ok, data}
    else
      # account already in the database, but not updating: kick off a new update
      account = %{updating?: false} ->
        edit_account_in_database(address, network, updating?: true)
        GenServer.cast(@me, {:sync, address, network, List.wrap(account.last)})
        {:ok, account.contents}

      # account in database and updating: don't kick off a new update
      %{updating?: true, contents: data} ->
        {:ok, data}

      :error ->
        {:error, :invalid_client}

      error ->
        error
    end
  end

  # GenServer callbacks

  def init(:ok) do
    {:ok, nil}
  end

  def handle_cast({:sync, address, network, opts}, state) do
    opts = Keyword.merge(opts, limit: @tx_limit)

    request =
      address
      |> B58.decode58!()
      |> RPC.Request.get_signatures_for_address(opts)

    # kick off an async request to the Solana RPC endpoint
    Client.cast(network, request)
    {:noreply, state}
  end

  def handle_cast({:sync, address, network}, state) do
    request =
      address
      |> B58.decode58!()
      |> RPC.Request.get_signatures_for_address(limit: @tx_limit)

    # kick off an async request to the Solana RPC endpoint
    Client.cast(network, request)
    {:noreply, state}
  end

  # handles responses to async requests
  def handle_info({:rpc, network, response}, state) do
    Enum.reduce_while(response, {:noreply, state}, fn pair, _acc ->
      case handle_rpc(pair, network, state) do
        {:noreply, _} = result -> {:cont, result}
        other -> {:halt, other}
      end
    end)
  end

  # get transaction signatures
  defp handle_rpc(
         {%{method: "getSignaturesForAddress", params: [address, opts]}, {:ok, response}},
         network,
         state
       ) do
    Enum.each(response, &put_account_signature_in_database(address, &1, network))

    response
    |> Enum.map(&Map.get(&1, "signature"))
    |> Enum.filter(&is_nil(get_transaction_from_database(B58.encode58(&1), network)))
    |> Enum.map(&RPC.Request.get_transaction(&1, encoding: "jsonParsed"))
    |> case do
      [] -> :ok
      requests -> Client.cast(network, requests)
    end

    handle_signature_response(response, address, opts, network, state)
  end

  # get transaction details
  defp handle_rpc(
         {%{method: "getTransaction", params: [signature, _]}, {:ok, response}},
         network,
         state
       ) do
    put_transaction_in_database(signature, response, network)
    {:noreply, state}
  end

  defp handle_signature_response(
         response,
         address,
         %{"before" => _, "until" => until},
         network,
         state
       ) do
    handle_signature_response(response, address, %{"until" => until}, network, state)
  end

  # now that we've got all the transactions going back in time, switch to going
  # forward from our starting point
  defp handle_signature_response(response, address, %{"before" => _}, network, state)
       when length(response) < @tx_limit do
    latest = get_latest_account_signature_from_database(address, network)
    edit_account_in_database(address, network, last: {:until, latest})
    handle_cast({:sync, address, network, until: latest}, state)
  end

  # move on to the previous N transactions
  defp handle_signature_response(response, address, %{"before" => _}, network, state) do
    oldest = response |> Enum.reverse() |> List.first() |> Map.get("signature")
    edit_account_in_database(address, network, last: {:before, oldest})
    handle_cast({:sync, address, network, before: oldest}, state)
  end

  # base case: no more transactions to analyze
  defp handle_signature_response([], address, %{"until" => _}, network, state) do
    latest = get_latest_account_signature_from_database(address, network)
    edit_account_in_database(address, network, updating?: false, last: {:until, latest})
    {:noreply, state}
  end

  # move on to the previous N transactions starting at the earliest transaction
  # just pulled and going until `until`
  defp handle_signature_response(response, address, %{"until" => until}, network, state) do
    earliest = response |> Enum.reverse() |> List.first() |> Map.get("signature")
    edit_account_in_database(address, network, last: [until: until, before: earliest])
    handle_cast({:sync, address, network, until: until, before: earliest}, state)
  end

  # kick off the initial load
  defp handle_signature_response(response, address, _, network, state) do
    handle_signature_response(response, address, %{"before" => nil}, network, state)
  end
end

defmodule SolanaExample do
  @doc """
  queries the Solana `network` for information about an `address`.
  """
  def query(address, opts) do
    case SolanaExample.Client.Supervisor.start_child(opts) do
      {:error, reason} when elem(reason, 0) != :already_started ->
        {:error, reason}

      _ ->
        SolanaExample.Runner.sync(address, opts[:network])
    end
  end
end

Все, что заканчивается на _in_database, включает в себя размещение записей и
извлечение записей из базы данных по вашему выбору. Я бы рекомендовал cubdb.

Запуск SolanaExample.query(address, network: "mainnet-beta") запускает процесс для получения всех транзакций, связанных с адресом, в фоновом режиме. Тем временем вы можете запрашивать другие адреса или выполнять другие команды в своей программе. Как только фоновый процесс завершает свою работу, у вас есть база данных, полная транзакций для агрегирования и обработки.

Заключение

Это весь код, который вам понадобится для сбора транзакций, связанных с адресом. Скорее всего, вы можете сделать это в JavaScript, но это может быть сложнее, чем решение Elixir.

Этот пример лишь поверхностно описывает возможности пакета solana. Он также включает в себя полезные утилиты для написания и тестирования ваших собственных программ-клиентов в Elixir. Ознакомьтесь с документацией, чтобы узнать больше!

P.S. Если вам нужно взаимодействовать с библиотекой программ Solana, вы можете использовать пакет solana_spl. Ознакомьтесь с его документацией, если она вам нужна!