Size: a a a

2020 July 15

nn

nasdaq nice in pro.jvm
сделай тредПул как consumer
создай ресурс(объект обертка) запихни в него мапу, он будет выдавать елемент из очереди и лочить внутри себя по ID
вооот остается только пайплайн организовать
источник

AE

Alexandr Emelyanov in pro.jvm
Yury Golikov
Посмотрел поверхностно. Оно же блокирует поток как я понял. Мне не нужно на каждого юзера по потоку
так на каждого по потоку и по очереди
источник

AE

Alexandr Emelyanov in pro.jvm
Alexandr Emelyanov
либо акторная система, в корутинах есть каналы
что бы не тратить потоки в пустую надо исопльзовать это
источник

AE

Alexandr Emelyanov in pro.jvm
либо ждать лума, может запилят то же самое для коллекций
источник

YG

Yury Golikov in pro.jvm
Alexandr Emelyanov
так на каждого по потоку и по очереди
Ну тогда если у меня в системе лям юзеров, мне нужно будет держать всегда лям потоков?)
источник

YG

Yury Golikov in pro.jvm
nasdaq nice
сделай тредПул как consumer
создай ресурс(объект обертка) запихни в него мапу, он будет выдавать елемент из очереди и лочить внутри себя по ID
вооот остается только пайплайн организовать
А как устроен внутри будет эта обертка не оч понял. Что значит лочить в этом контексте?
источник

AE

Alexandr Emelyanov in pro.jvm
Yury Golikov
Ну тогда если у меня в системе лям юзеров, мне нужно будет держать всегда лям потоков?)
так у тебя не по пулу потоков на юзера?
источник

YG

Yury Golikov in pro.jvm
Alexandr Emelyanov
так у тебя не по пулу потоков на юзера?
Не. Мне же как раз обрабатывать надо последовательно ток запросы от одного юзера. А запросы от разных юзеров - могут параллельно
источник

nn

nasdaq nice in pro.jvm
ну типа объект внутри которого твоя хешмапа будет лежать
а как он будет ставить лок и тем более снимать надо подумать ))00)0))))))
источник

AE

Alexandr Emelyanov in pro.jvm
Yury Golikov
Привет
Мне нужно обрабатывать запросы для каждого юзера (по userID) последовательно. Те чтобы запросы от одного userID не могли идти на параллельных потоках. При этом отменять такие запросы нельзя, нужно класть во очередь.

Как это лучше всего сделать?

Пока по топорному думаю сделать так:
HashMap по UserID c очередями для запросов. Ищем в мапе по UserID, если нет очереди - значит пушим задачу в ThreadPoolExecutor. Если есть, значит ставим в очередь. В свою очередь после выполнения таски подвешиваем хук afterExecute и ищем новую задачу для этого UserID если есть.
знаешь какую я проблему вижу? может случиться так, что у тебя парллельно может обрабатываться только Х эзеров, у них очередь всегда забита, на другие очереди переключения у тебя нет
источник

AE

Alexandr Emelyanov in pro.jvm
эта задача идельна для каналов корутин
источник

YG

Yury Golikov in pro.jvm
Alexandr Emelyanov
знаешь какую я проблему вижу? может случиться так, что у тебя парллельно может обрабатываться только Х эзеров, у них очередь всегда забита, на другие очереди переключения у тебя нет
Ну поэтому моя задумка была, чтобы поток например по завершению задачи для юзера№3 - клал новую задачу из очереди юзера№3 в общую очередь пула
источник

AE

Alexandr Emelyanov in pro.jvm
как вариант
источник

YG

Yury Golikov in pro.jvm
А вообще пытаюсь загуглить, ток хз как это проблема/задача называется.
У кафки есть что то похожее когда она бьет по партициям
источник

AG

Alexey Genus in pro.jvm
А, что если попробовать ConcurrentMap<UserId, BlockingQueue<Task>>?
Немного прямолинейно, но должно работать.
источник

nn

nasdaq nice in pro.jvm
Yury Golikov
Ну тогда если у меня в системе лям юзеров, мне нужно будет держать всегда лям потоков?)
крч
пусть будет объект - ресурс который внутри себя хранит мапу с очередями запросов
пусть у него будет get() метод который будет раунд робином обходить id и возвращать запрос от юзера и лочить этот id
потом пусть есть у тебя запрос от юзера будем  считать что это Runnable
обернем его в Callable который при завершении таски будет возвращать ID
делаешь таску, потом unlock(id) на своем ресурсе и при след обходе все будет ок
и тут все-таки понадобится поток который будет обходить все это
но такое этот подход не fault tolerance
источник

AG

Alexey Genus in pro.jvm
С мапой плохо. Вот так лучше.
С одной стороны складываем задачи в очередь.

С другой стороны делаем один рабочий поток и несколько однопоточных тред-пулов.

Рабочий поток читает из этой очереди задачи и раскидывает задачи согласно какому-нибудь алгоритму консистентного хеширования.

Таким образом и потоков мало, и сохраняется порядок для каждого пользователя при контролируемом количестве потоков.
источник

DB

Dmitry Baynak in pro.jvm
держать linkedhashset<userid> (просто покрытый локом; нужно что-то с порядком + уникальностью э-тов) свободных userid, у которых есть невыполненные задачи, + CHM<userid, blockingqueue<task>>, как чуть выше написано
тред выполнил 1 задачу для конкретного userid? проверяет есть ли ещё задачи (только он сейчас владеет userid, поэтому это безопасно) - если есть ещё, добавляет в конец blockingqueue
если приходит задача извне, тред добавляет в linkedhashset (не обновляя порядок) + в соотв. очередь складывает задачу (тут может быть ситуация что внешний тред лезет в конкретный userid и какой-то воркер владеет этим userid - поэтому стоит поддерживать уникальность на уровне списка свободных userid)
источник

YG

Yury Golikov in pro.jvm
nasdaq nice
крч
пусть будет объект - ресурс который внутри себя хранит мапу с очередями запросов
пусть у него будет get() метод который будет раунд робином обходить id и возвращать запрос от юзера и лочить этот id
потом пусть есть у тебя запрос от юзера будем  считать что это Runnable
обернем его в Callable который при завершении таски будет возвращать ID
делаешь таску, потом unlock(id) на своем ресурсе и при след обходе все будет ок
и тут все-таки понадобится поток который будет обходить все это
но такое этот подход не fault tolerance
А почему ты считаешь, что раунд робин тут лучше, чем проверять есть ли уже очередь для задач юзера и в итоге класть задачи в общую очередь?
источник

nn

nasdaq nice in pro.jvm
Alexey Genus
С мапой плохо. Вот так лучше.
С одной стороны складываем задачи в очередь.

С другой стороны делаем один рабочий поток и несколько однопоточных тред-пулов.

Рабочий поток читает из этой очереди задачи и раскидывает задачи согласно какому-нибудь алгоритму консистентного хеширования.

Таким образом и потоков мало, и сохраняется порядок для каждого пользователя при контролируемом количестве потоков.
огонь! главное распределение равномерным выдержать
источник