Исключение Вторая операция, запущенная в этом контексте до завершения предыдущей операции, вызвана при использовании обработчика событий.

У меня есть обработчик событий в моем проекте консольного приложения С#, который запускается при поступлении обновления от пользователя (сценарий чат-бота). Проблема в том, что даже если я использую await внутри обработчика событий, я все равно получаю исключение (недопустимое исключение операции), когда код достигает точки, в которой он хочет получить пользовательские данные из базы данных.

С другой стороны, когда я отпускаю обработчик событий и использую метод длительного опроса для получения обновлений, я не сталкиваюсь с этой проблемой. Я думаю, может быть, этот обработчик событий создает новый поток для каждого получаемого им обновления, поэтому возникает это исключение. Мне было интересно, могу ли я использовать обработчики событий и не сталкиваться с этой проблемой? Вот мой код:

 public class TelegramService : IChatbotService
    {
        private readonly ILogger _logger;
        private readonly ITelegramBotClientFactory _telegramBotFactory;
        private ITelegramBotClient _telegramBotClient;
        internal static User Me;
        private readonly IChatbotUpdateHandler _chatbotUpdateHandler;
        private readonly ISettingService _settingService;

        public TelegramService(ITelegramBotClientFactory telegramBotClientFactory, ILogger<TelegramService> logger,
            IChatbotUpdateHandler chatbotUpdateHandler)
        {
            _logger = logger;
            _telegramBotFactory = telegramBotClientFactory;
            _chatbotUpdateHandler = chatbotUpdateHandler;

        }
        public async Task<bool> Run()
        {

            try
            {
                _telegramBotClient = _telegramBotFactory.CreateBotClient();

                await _telegramBotClient.DeleteWebhookAsync();


                Me = await _telegramBotClient.GetMeAsync();
            }
            catch (Exception e)
            {
                _logger.LogError($"502 bad gateway, restarting in 2 seconds:\n\n{e.Message}", e.Message);
                Thread.Sleep(TimeSpan.FromSeconds(2));
                //API is down... 
                return true;
            }
          
            _telegramBotClient.OnUpdate += BotOnUpdateReceived; // event handler

            _telegramBotClient.StartReceiving();

            return false;
        }

  private async void BotOnUpdateReceived(object sender, UpdateEventArgs args)
        {
            var update = args.Update;
            if (update.Type == UpdateType.InlineQuery) return;
            if (update.Type == UpdateType.CallbackQuery) return;

            await _chatbotUpdateHandler.Handle(update);

        }

}

public class TelegramUpdateHandler : IChatbotUpdateHandler
    {

        private Update _update;
        private readonly ILogger<TelegramUpdateHandler> _logger;
        private readonly IUserService _userService;
        private readonly IChatProcessorFactory _chatProcessorFactory;
        private readonly IUserMessagingService _userMessagingService;

        public TelegramUpdateHandler(ILogger<TelegramUpdateHandler> logger, IUserService userService,
            IChatProcessorFactory chatProcessorFactory, IUserMessagingService userMessagingService)
        {
            _logger = logger;
            _userService = userService;
            _chatProcessorFactory = chatProcessorFactory;
            _userMessagingService = userMessagingService;
        }
        public async Task Handle(object updateObject)
        {
            
            try
            {
                var botUser = await GetUser();
               
                await ProcessUpdate(botUser);


            }
            catch (UnAuthorizedException e)
            {
                //User is grounded or does not have access to bot
                _logger.LogInformation($"User is unauthorized to access the bot:\n{e.Message}");
            }
            catch (Exception e)
            {
                _logger.LogError($"Error occured at Handle:\n{e.Message}");
            }

        }

        private async Task<BotUser> GetUser()
        {
            BotUser botUser = null;

            try
            {
                botUser = await _userService.FetchUser(_update.Message.From.Id);
                //Exception is thrown when calling "FetchUser" when second update comes here.
            }
            catch (InvalidOperationException e)
            {
                botUser = _userService.CreateNewBotUser(_update.Message.From);

                botUser = await _userService.AddUserToDb(botUser);
            }

            return botUser;
        }
    }
    

    [Export(typeof(IUserService))]
    public class UserService : IUserService
    {
        private readonly ILanguageService _languageService;
        private readonly IUnitOfWork _unitOfWork;
        private readonly ITelegramApiService _telegramApiService;

        [ImportingConstructor]
        public UserService(ITelegramApiService telegramApiService, ILanguageService languageService, IUnitOfWork unitOfWork)
        {
            _telegramApiService = telegramApiService;
            _languageService = languageService;
            _unitOfWork = unitOfWork;
        }

        public async Task<BotUser> FetchUser(int userId)
        {

            return await _unitOfWork.Users.Fetch(userId);

        }
     }

Если я изменю приведенное выше на что-то вроде приведенного ниже, у меня не возникнет никаких проблем:

var updates = await GetUpdates(); 
//Through long polling we get the updates rather than using an event handler.

            if (updates.Length > 0)
              {
                  HandleUpdates(updates.ToList());   

                  lastUpdateID = updates[^1].Id;
              }

///........

private async Task HandleUpdates(List<Update> updates)
        {
            foreach (var item in updates)
            {
                if (item.Type == UpdateType.InlineQuery) continue;
                if (item.Type == UpdateType.CallbackQuery) continue;

                await _chatbotUpdateHandler.Handle(item);

            }
        }

/// The rest is similar to the previous version

PS*: я также зарегистрировал все свои сервисы как Transient


person Moji    schedule 18.01.2021    source источник
comment
Пожалуйста, поделитесь минимально воспроизводимым примером.   -  person mjwills    schedule 18.01.2021
comment
@mjwills Я добавил больше кода, чтобы уточнить точный путь, по которому выдается исключение.   -  person Moji    schedule 18.01.2021
comment
Недостаточно кода. Когда я могу скопировать и вставить его и запустить, это минимально воспроизводимый пример. Если я не могу, это (пока) не минимально воспроизводимый пример.   -  person mjwills    schedule 18.01.2021
comment
@mjwills снова отредактировано. Надеюсь, этого достаточно.   -  person Moji    schedule 18.01.2021


Ответы (1)


обработчик событий в моем проекте консольного приложения С#

async void был разработан для обработчиков событий, но предполагается, что они похожи на обработчики событий пользовательского интерфейса:

  1. await захватывает текущий контекст и возобновляет работу в этом контексте.
  2. Исключения, вызванные из методов async void, повторно вызываются в методе SynchronizationContext, который присутствовал в начале метода.
  3. Невозможно await использовать async void метод; пользовательский интерфейс просто возвращается к своему циклу сообщений.

В мире пользовательского интерфейса оба этих поведения имеют смысл и приводят к тому, что обработчик событий async void имеет семантику, схожую с обработчиком событий, отличным от async. В мире консоли эти действия приводят к следующей семантике (соответственно):

  1. После каждого await обработчик возобновляет выполнение в потоке пула потоков.
  2. Исключения, вызванные async void методами, повторно вызываются непосредственно в пуле потоков, что приведет к сбою процесса.
  3. Невозможно await использовать async void метод, поэтому ваш код не может (легко) узнать, когда он завершен.

Таким образом, async обработка событий в консольных процессах работает не так хорошо, как в среде пользовательского интерфейса. Тем не менее, вы можете использовать их, если хотите; вам просто нужно знать об этой семантике.

В частности, поскольку await возобновится в потоке пула потоков, чтобы избежать второго исключения операции, вам потребуется:

  1. Предоставьте отдельный экземпляр DbContext каждому обработчику событий.
  2. Измените обработчик событий, чтобы он поддерживал (асинхронные) уведомления о том, что обработчики событий завершены (например, с использованием отсрочек).
  3. Рефакторинг кода, чтобы события помещались в очередь (например, Channel<T>), которая обрабатывается компонентом BackgroundService.

Пример использования первого подхода (создание нового DbContext для каждого обработчика):

private async void BotOnUpdateReceived(object sender, UpdateEventArgs args)
{
  var update = args.Update;
  if (update.Type == UpdateType.InlineQuery) return;
  if (update.Type == UpdateType.CallbackQuery) return;

  var chatbotUpdateHandler = _serviceProvider.GetRequiredService<IChatbotUpdateHandler>();
  await chatbotUpdateHandler.Handle(update);
}
person Stephen Cleary    schedule 18.01.2021
comment
С удовольствием прочитал ваш ответ. Я продолжил ваш первый подход, и он работает нормально. Я думал, что, поскольку я уже определил IChatboUpdateHandler как временную службу, я буду получать отдельный объект каждый раз, когда вызывается обработчик событий. Это было неправильно, потому что TelegramService запускается в начале приложения и создается только один раз за время жизни приложения, поэтому IChatbotHandler также создавался только один раз. Хитрость заключалась в том, чтобы получать новый IChatbotHandler каждый раз, когда возникает событие, как вы прекрасно упомянули! - person Moji; 19.01.2021