Внедрение Elixir в экосистему Solana и почему вы можете захотеть создать свой следующий программный клиент в Elixir
Дерек Мир
Я рад объявить о первых выпусках двух новых пакетов Elixir: solana
, неофициальный клиент Solana API, и solana_spl
, неофициальный интерфейс Solana Program Library.
Почему?
До того, как я начал работать в венчурной студии DeFi, я работал над веб-приложениями, написанными на Эликсире. Elixir — отличный выбор для создания современных отказоустойчивых параллельных приложений, таких как приложения Web2 или конвейеры данных. Я отметил несколько случаев в работе, которые могли бы выиграть от использования библиотеки Elixir или языковой функции. Недавно, проводя исследование для нового проекта, я нашел один случай, который настолько хорошо соответствует сильным сторонам Elixir, что решил самостоятельно запустить экосистему Elixir-Solana.
Примечание. остальная часть этой статьи предполагает, что у вас есть некоторое представление об Эликсире. Если вы хотите узнать больше, официальный сайт и документация — хорошие места для начала.
Проблема
Соберите все транзакции, связанные с токенами, и запустите на них
proprietary operations
.
Solana предоставляет несколько методов API для этого: getSignaturesForAccount
, который возвращает список подписей транзакций, и getTransaction
, который возвращает детали транзакции с учетом ее подписи. Вам нужно вызвать getSignaturesForAccount
для монетного двора токена, который вы хотите проанализировать, затем для каждого элемента в ответе вы вызываете getTransaction
, чтобы получить подробности.
Некоторые дополнительные ограничения:
- большинство официальных конечных точек имеют ограничение в 100 запросов каждые 10 секунд на IP.
getSignaturesForAccount
возвращает максимум 1000 подписей; большинство крупных дистрибутивов токенов имеют на порядок больше транзакций, чем это.
Я не знаю, как лучше всего это сделать в JavaScript, поэтому давайте посмотрим, как я решу эту проблему в Elixir.
Код
Первое ограничение, которое я перечислил выше, — это ограничение скорости. При применении ограничений скорости в конвейерах данных, подобных этому, разработчики Elixir обращаются к пакету GenStage
. Это настолько распространенный вариант использования, что официальный репозиторий предоставляет пример ограничения скорости.
Я немного изменил этот код, чтобы он соответствовал моим потребностям:
elixir defmodule SolanaExample.Client.Producer do @moduledoc """ This module accepts new API requests and forwards them to the `SolanaExample.RateLimiter` for eventual execution. """ use GenStage # API def start_link(config) do GenStage.start_link(__MODULE__, :ok, name: SolanaExample.Registry.via(name(config))) end # This process' name, so that other processes can find it def name(config) do Keyword.get(config, :network) <> "_producer" end def cast(producer, requests) do GenStage.cast(producer, {:send, requests, self()}) end def call(producer, requests), do: GenStage.call(producer, {:send, requests}) # GenStage callbacks def init(:ok) do {:producer, %{demand: 0, queue: []}} end def handle_cast({:send, requests, from}, state) do handle_call({:send, requests}, from, state) end def handle_call({:send, requests}, from, state) do {events, queue, demand} = state.queue |> add_to_queue(from, requests) |> fulfill_demand(state.demand) {:noreply, expand(events), %{queue: queue, demand: demand}} end def handle_demand(demand, state) do {events, queue, demand} = state.queue |> fulfill_demand(demand) {:noreply, expand(events), %{queue: queue, demand: demand}} end defp add_to_queue(queue, from, requests) do List.flatten([queue | [{from, List.wrap(requests)}]]) end defp expand(events) do events |> Enum.map(fn {from, requests} -> Enum.map(requests, &{from, &1}) end) |> List.flatten() end defp fulfill_demand(queue, demand) do Enum.reduce_while(queue, {[], queue, demand}, fn {from, requests}, {to_emit, [_ | rest], demand} when length(requests) <= demand -> {:cont, {[{from, requests} | to_emit], rest, demand - length(requests)}} _, acc -> {:halt, acc} end) end end defmodule SolanaExample.Client.RateLimiter do @moduledoc """ This module rate limits the API requests from the Producer to 100 every 10 seconds. """ use GenStage alias SolanaExample.{Registry, Client} # API def start_link(config) do GenStage.start_link(__MODULE__, config, name: Registry.via(name(config))) end # the registry's name for this process so other processes can find it def name(config) do Keyword.get(config, :network) <> "_ratelimiter" end # GenStage callbacks def init(config) do subscription = Registry.lookup(Client.Producer.name(config)) {:producer_consumer, %{}, subscribe_to: [subscription]} end def handle_subscribe(:producer, opts, from, producers) do limit = Keyword.get(opts, :max_demand, 100) interval = Keyword.get(opts, :interval, 10_000) producers = producers |> Map.put(from, {limit, interval}) |> ask_and_schedule(from) {:manual, producers} end def handle_subscribe(:consumer, _opts, _from, consumers) do {:automatic, consumers} end def handle_cancel(_, from, producers) do {:noreply, [], producers |> Map.delete(from)} end def handle_events(events, _from, producers) do {:noreply, events, producers} end def handle_info({:ask, from}, producers) do {:noreply, [], ask_and_schedule(producers, from)} end defp ask_and_schedule(producers, from) do case producers do %{^from => {limit, interval}} -> GenStage.ask(from, limit) Process.send_after(self(), {:ask, from}, interval) producers %{} -> producers end end end defmodule SolanaExample.Client.Consumer do @moduledoc """ This module actually executes the rate-limited API requests. """ use GenStage alias Solana.RPC alias SolanaExample.Client # API def start_link(config), do: GenStage.start_link(__MODULE__, config) # GenStage callbacks def init(config) do subscription = SolanaExample.Registry.lookup(Client.RateLimiter.name(config)) adapter = {Tesla.Adapter.Gun, [certificates_verification: true]} opts = Keyword.merge(config, adapter: adapter) state = %{network: Keyword.get(config, :network), client: RPC.client(opts)} {:consumer, state, subscribe_to: [subscription]} end def handle_events(events, _from, state = %{client: client}) do events |> Enum.reduce(%{}, fn {from, request}, batches -> Map.update(batches, from, [request], &[request | &1]) end) |> Enum.each(fn {from, requests} when is_pid(from) -> requests = Enum.reverse(requests) response = Enum.zip(RPC.Request.encode(requests), RPC.send(client, requests)) send(from, {:rpc, state.network, response}) {from, requests} -> GenStage.reply(from, RPC.send(client, Enum.reverse(requests))) end) {:noreply, [], state} end end
Я не буду подробно рассматривать этот код, но его основная задача — принимать новые запросы API, ограничивать их скорость и отправлять их на конечную точку Solana RPC. На работе проекты постоянно сталкивались с ограничениями API, как только они попадали в рабочую среду; этот код предотвратит эту проблему.
Теперь все, что нам нужно, — это способ запустить все эти процессы и запустить наш первоначальный запрос.
Чтобы запустить все эти процессы со встроенным аварийное восстановление, вы можете контролировать их:
elixir defmodule SolanaExample.Client do @moduledoc """ Responsible for supervising the GenStage pipeline for a specific Solana cluster. """ use Supervisor alias SolanaExample.{Client, Registry} def start_link(config) do Supervisor.start_link(__MODULE__, config, name: name(config)) end def init(config) do children = [ {Client.Producer, config}, {Client.RateLimiter, config}, {Client.Consumer, config} ] Supervisor.init(children, strategy: :rest_for_one) end def cast(name, requests) when is_binary(name) do lookup_and_run(name, requests, &cast/2) end def cast(client, requests) when is_pid(client) do find_child_and_run(client, requests, &Client.Producer.cast/2) end def call(name, requests) when is_binary(name) do lookup_and_run(name, requests, &call/2) end def call(client, requests) when is_pid(client) do find_child_and_run(client, requests, &Client.Producer.call/2) end defp lookup_and_run(name, requests, fun) do case Registry.lookup(name) do nil -> :error client -> fun.(client, requests) end end defp find_child_and_run(client, requests, fun) do client |> Supervisor.which_children() |> Enum.find(&(elem(&1, 0) == Client.Producer)) |> case do nil -> :error {_, producer, _, _} -> fun.(producer, requests) end end defp name(config), do: Registry.via(Keyword.fetch!(config, :network)) end
Если вы хотите отслеживать несколько кластеров, то есть devnet
против mainnet-beta
,
вам, вероятно, следует иметь разные Client
для каждого кластера, который вы хотите
отслеживать. Вы можете сделать это с помощью DynamicSupervisor
:
elixir defmodule SolanaExample.Client.Supervisor do @moduledoc """ Responsible for managing `SolanaExample.Client` processes. """ use DynamicSupervisor @me __MODULE__ def start_link(_), do: DynamicSupervisor.start_link(@me, :ok, name: @me) @doc """ retrieves or kicks off a `SolanaExample.Client` process based on the desired cluster. """ def get_or_start_client(config) do case start_child(config) do {:error, {:already_started, pid}} -> {:ok, pid} other -> other end end def start_child(config) do DynamicSupervisor.start_child(@me, {SolanaExample.Client, config}) end def init(_), do: DynamicSupervisor.init(strategy: :one_for_one) end
Наконец, вот способ запустить асинхронный запрос:
elixir defmodule SolanaExample.Runner do @moduledoc """ Runs various asynchronous queries associated with collecting address transactions. """ use GenServer @me __MODULE__ @tx_limit 25 alias Solana.RPC alias SolanaExample.Client def start_link(_), do: GenServer.start_link(@me, :ok, name: @me) @doc """ Gets account information for an address, and kicks off an asynchronous workstream to retrieve every new transaction associated with this account. This runs in the background until there are no more transactions to process. """ def sync(address, network) do with {:ok, decoded} <- B58.decode58(address), request = RPC.Request.get_account_info(decoded, encoding: "jsonParsed"), nil <- get_account_from_database(address, network), [{:ok, data}] <- Client.call(network, request) do add_account_to_database(address, network, %{data: data, updating?: true, last: nil}) # GenServer.cast(@me, {:sync, address, network}) {:ok, data} else # account already in the database, but not updating: kick off a new update account = %{updating?: false} -> edit_account_in_database(address, network, updating?: true) GenServer.cast(@me, {:sync, address, network, List.wrap(account.last)}) {:ok, account.contents} # account in database and updating: don't kick off a new update %{updating?: true, contents: data} -> {:ok, data} :error -> {:error, :invalid_client} error -> error end end # GenServer callbacks def init(:ok) do {:ok, nil} end def handle_cast({:sync, address, network, opts}, state) do opts = Keyword.merge(opts, limit: @tx_limit) request = address |> B58.decode58!() |> RPC.Request.get_signatures_for_address(opts) # kick off an async request to the Solana RPC endpoint Client.cast(network, request) {:noreply, state} end def handle_cast({:sync, address, network}, state) do request = address |> B58.decode58!() |> RPC.Request.get_signatures_for_address(limit: @tx_limit) # kick off an async request to the Solana RPC endpoint Client.cast(network, request) {:noreply, state} end # handles responses to async requests def handle_info({:rpc, network, response}, state) do Enum.reduce_while(response, {:noreply, state}, fn pair, _acc -> case handle_rpc(pair, network, state) do {:noreply, _} = result -> {:cont, result} other -> {:halt, other} end end) end # get transaction signatures defp handle_rpc( {%{method: "getSignaturesForAddress", params: [address, opts]}, {:ok, response}}, network, state ) do Enum.each(response, &put_account_signature_in_database(address, &1, network)) response |> Enum.map(&Map.get(&1, "signature")) |> Enum.filter(&is_nil(get_transaction_from_database(B58.encode58(&1), network))) |> Enum.map(&RPC.Request.get_transaction(&1, encoding: "jsonParsed")) |> case do [] -> :ok requests -> Client.cast(network, requests) end handle_signature_response(response, address, opts, network, state) end # get transaction details defp handle_rpc( {%{method: "getTransaction", params: [signature, _]}, {:ok, response}}, network, state ) do put_transaction_in_database(signature, response, network) {:noreply, state} end defp handle_signature_response( response, address, %{"before" => _, "until" => until}, network, state ) do handle_signature_response(response, address, %{"until" => until}, network, state) end # now that we've got all the transactions going back in time, switch to going # forward from our starting point defp handle_signature_response(response, address, %{"before" => _}, network, state) when length(response) < @tx_limit do latest = get_latest_account_signature_from_database(address, network) edit_account_in_database(address, network, last: {:until, latest}) handle_cast({:sync, address, network, until: latest}, state) end # move on to the previous N transactions defp handle_signature_response(response, address, %{"before" => _}, network, state) do oldest = response |> Enum.reverse() |> List.first() |> Map.get("signature") edit_account_in_database(address, network, last: {:before, oldest}) handle_cast({:sync, address, network, before: oldest}, state) end # base case: no more transactions to analyze defp handle_signature_response([], address, %{"until" => _}, network, state) do latest = get_latest_account_signature_from_database(address, network) edit_account_in_database(address, network, updating?: false, last: {:until, latest}) {:noreply, state} end # move on to the previous N transactions starting at the earliest transaction # just pulled and going until `until` defp handle_signature_response(response, address, %{"until" => until}, network, state) do earliest = response |> Enum.reverse() |> List.first() |> Map.get("signature") edit_account_in_database(address, network, last: [until: until, before: earliest]) handle_cast({:sync, address, network, until: until, before: earliest}, state) end # kick off the initial load defp handle_signature_response(response, address, _, network, state) do handle_signature_response(response, address, %{"before" => nil}, network, state) end end defmodule SolanaExample do @doc """ queries the Solana `network` for information about an `address`. """ def query(address, opts) do case SolanaExample.Client.Supervisor.start_child(opts) do {:error, reason} when elem(reason, 0) != :already_started -> {:error, reason} _ -> SolanaExample.Runner.sync(address, opts[:network]) end end end
Все, что заканчивается на _in_database
, включает в себя размещение записей и
извлечение записей из базы данных по вашему выбору. Я бы рекомендовал cubdb
.
Запуск SolanaExample.query(address, network: "mainnet-beta")
запускает процесс для получения всех транзакций, связанных с адресом, в фоновом режиме. Тем временем вы можете запрашивать другие адреса или выполнять другие команды в своей программе. Как только фоновый процесс завершает свою работу, у вас есть база данных, полная транзакций для агрегирования и обработки.
Заключение
Это весь код, который вам понадобится для сбора транзакций, связанных с адресом. Скорее всего, вы можете сделать это в JavaScript, но это может быть сложнее, чем решение Elixir.
Этот пример лишь поверхностно описывает возможности пакета solana
. Он также включает в себя полезные утилиты для написания и тестирования ваших собственных программ-клиентов в Elixir. Ознакомьтесь с документацией, чтобы узнать больше!
P.S. Если вам нужно взаимодействовать с библиотекой программ Solana, вы можете использовать пакет solana_spl
. Ознакомьтесь с его документацией, если она вам нужна!