Camel MDC Logback Устаревшая информация в недостаточном объеме

У нас есть высоконагруженное приложение Apache Camel, которое использует логбэк / MDC для регистрации информации. Мы обнаружили, что некоторая информация о MDC в потоках устарела, о чем уже говорилось в документации по журналированию. Я нашел этот вопрос SO, который касается этой проблемы:

Как использовать MDC с пулами потоков?

Как мы должны применить это к нашему приложению для верблюда, чтобы избежать устаревшей информации? Есть ли простое глобальное изменение ThreadPoolExecutor по умолчанию на пользовательский вариант, как это предлагается в связанном вопросе? Я вижу, вы можете сделать это для самих пулов, но не видел примеров для исполнителя. Имейте в виду, что наше приложение довольно велико и ежедневно обслуживает большой объем заказов - я бы хотел, чтобы влияние на существующее приложение было минимальным.


person Dakota Brown    schedule 19.10.2015    source источник


Ответы (1)


Я понял это и хотел опубликовать то, что сделал, на случай, если это принесет пользу кому-то другому. Обратите внимание, я использую JDK 6 / camel2.13.2

  • Верблюд имеет DefaultExecutorServiceManager, который использует DefaultThreadPoolFactory. Я расширил фабрику по умолчанию до MdcThreadPoolFactory

  • DefaultThreadPoolFactory имеет методы для генерации RejectableThreadPoolExecutors и RejectableScheduledThreadPoolExecutors. Я расширил оба из них до версий Mdc *, которые переопределяют метод execute(), чтобы обернуть Runnable и передать информацию MDC между потоками (как указано ссылкой в ​​моем исходном вопросе).

  • Я создал экземпляр bean-компонента MdcThreadPoolFactory в своей конфигурации приложения, который автоматически выбирается Camel и используется в ExecutorServiceManager

MdcThreadPoolExecutor:

package com.mypackage.concurrent

import org.apache.camel.util.concurrent.RejectableThreadPoolExecutor
import org.slf4j.MDC;

import java.util.Map;
import java.util.concurrent.*;

/**
 * A SLF4J MDC-compatible {@link ThreadPoolExecutor}.
 * <p/>
 * In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate
 * logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a
 * thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately.
 * <p/>
 * Created by broda20.
 * Date: 10/29/15
 */
public class MdcThreadPoolExecutor extends RejectableThreadPoolExecutor {

    @SuppressWarnings("unchecked")
    private Map<String, Object> getContextForTask() {
        return MDC.getCopyOfContextMap();
    }

    public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    /**
     * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
     * all delegate to this.
     */
    @Override
    public void execute(Runnable command) {
        super.execute(wrap(command, getContextForTask()));
    }

    public static Runnable wrap(final Runnable runnable, final Map<String, Object> context) {
        return new Runnable() {
            @Override
            public void run() {
                Map previous = MDC.getCopyOfContextMap();
                if (context == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(context);
                }
                try {
                    runnable.run();
                } finally {
                    if (previous == null) {
                        MDC.clear();
                    } else {
                        MDC.setContextMap(previous);
                    }
                }
            }
        };
    }
}

MdcScheduledThreadPoolExecutor:

package com.mypackage.concurrent

import org.apache.camel.util.concurrent.RejectableScheduledThreadPoolExecutor
import org.slf4j.MDC;

import java.util.Map;
import java.util.concurrent.*;

/**
 * A SLF4J MDC-compatible {@link ThreadPoolExecutor}.
 * <p/>
 * In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate
 * logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a
 * thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately.
 * <p/>
 * Created by broda20.
 * Date: 10/29/15
 */
public class MdcScheduledThreadPoolExecutor extends RejectableScheduledThreadPoolExecutor {

    @SuppressWarnings("unchecked")
    private Map<String, Object> getContextForTask() {
        return MDC.getCopyOfContextMap();
    }

    public MdcScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize);
    }

    public MdcScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
        super(corePoolSize, threadFactory);
    }

    public MdcScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
        super(corePoolSize, handler);
    }

    public MdcScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, threadFactory, handler);
    }

    /**
     * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
     * all delegate to this.
     */
    @Override
    public void execute(Runnable command) {
        super.execute(wrap(command, getContextForTask()));
    }

    public static Runnable wrap(final Runnable runnable, final Map<String, Object> context) {
        return new Runnable() {
            @Override
            public void run() {
                Map previous = MDC.getCopyOfContextMap();
                if (context == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(context);
                }
                try {
                    runnable.run();
                } finally {
                    if (previous == null) {
                        MDC.clear();
                    } else {
                        MDC.setContextMap(previous);
                    }
                }
            }
        };
    }
}

MdcThreadPoolFactory:

package com.mypackage.concurrent

import org.apache.camel.impl.DefaultThreadPoolFactory
import org.apache.camel.spi.ThreadPoolProfile
import org.apache.camel.util.concurrent.SizedScheduledExecutorService
import org.slf4j.MDC;

import java.util.Map;
import java.util.concurrent.*;

public class MdcThreadPoolFactory extends DefaultThreadPoolFactory {

    @SuppressWarnings("unchecked")
    private Map<String, Object> getContextForTask() {
        return MDC.getCopyOfContextMap();
    }


    public ExecutorService newThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize, boolean allowCoreThreadTimeOut,
                                             RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory) throws IllegalArgumentException {

            // the core pool size must be 0 or higher
            if (corePoolSize < 0) {
               throw new IllegalArgumentException("CorePoolSize must be >= 0, was " + corePoolSize);
            }

            // validate max >= core
            if (maxPoolSize < corePoolSize) {
                throw new IllegalArgumentException("MaxPoolSize must be >= corePoolSize, was " + maxPoolSize + " >= " + corePoolSize);
            }

            BlockingQueue<Runnable> workQueue;
            if (corePoolSize == 0 && maxQueueSize <= 0) {
                // use a synchronous queue for direct-handover (no tasks stored on the queue)
                workQueue = new SynchronousQueue<Runnable>();
                // and force 1 as pool size to be able to create the thread pool by the JDK
                corePoolSize = 1;
                maxPoolSize = 1;
            } else if (maxQueueSize <= 0) {
                // use a synchronous queue for direct-handover (no tasks stored on the queue)
                workQueue = new SynchronousQueue<Runnable>();
            } else {
                // bounded task queue to store tasks on the queue
                workQueue = new LinkedBlockingQueue<Runnable>(maxQueueSize);
            }

            ThreadPoolExecutor answer = new MdcThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue);
            answer.setThreadFactory(threadFactory);
            answer.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
            if (rejectedExecutionHandler == null) {
                rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
            }
            answer.setRejectedExecutionHandler(rejectedExecutionHandler);
            return answer;
        }

        @Override
        public ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) {
            RejectedExecutionHandler rejectedExecutionHandler = profile.getRejectedExecutionHandler();
            if (rejectedExecutionHandler == null) {
                rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
            }

            ScheduledThreadPoolExecutor answer = new MdcScheduledThreadPoolExecutor(profile.getPoolSize(), threadFactory, rejectedExecutionHandler);
            //JDK7: answer.setRemoveOnCancelPolicy(true);

            // need to wrap the thread pool in a sized to guard against the problem that the
            // JDK created thread pool has an unbounded queue (see class javadoc), which mean
            // we could potentially keep adding tasks, and run out of memory.
            if (profile.getMaxPoolSize() > 0) {
                return new SizedScheduledExecutorService(answer, profile.getMaxQueueSize());
            } else {
                return answer;
            }
        }
}

И, наконец, экземпляр bean-компонента:

<bean id="mdcThreadPoolFactory" class="com.mypackage.concurrent.MdcThreadPoolFactory"/>
person Dakota Brown    schedule 29.10.2015
comment
Чтобы заставить это работать в Camel 2.16.3 для новых потоков, запрошенных org.apache.camel.util.component.AbstractApiProducer.process (Exchange, AsyncCallback), мне также пришлось переопределить java.util.concurrent.ScheduledThreadPoolExecutor.submit (Runnable) - person Paul M; 27.06.2016
comment
Впоследствии я изменил это, чтобы переопределить общедоступное расписание ScheduledFuture ‹?› (Runnable command, long delay, TimeUnit unit), которое представляет собой то, что submit () и execute () с делегатом (по крайней мере, в JDK8). Я думаю, что это будет хорошая идея для core camel. Если я найду время, чтобы разобраться с этим, вы согласны с подписанием авторских прав на Apache (или любой другой лицензионной работы, которая требуется)? - person Paul M; 29.06.2016
comment
Меня это полностью устраивает. Все, что угодно, чтобы Camel стал лучше в долгосрочной перспективе :) - person Dakota Brown; 29.06.2016
comment
@PaulM - это вошло в верблюжий ядро? Я сталкиваюсь с аналогичной проблемой с компонентом CXF в асинхронном режиме, в результате чего разные данные MDC регистрируются после возникновения ошибок, и мне интересно, может ли это решение помочь. Спасибо - person Tom Bunting; 31.01.2019