Привет
Делаю через processor api несколько нод процессоров
В одну добавляю глобальное хранилище, в следующую - обычное
Но обычное почему-то возвращает нулл в init() методе
keyValueStore в следующем процессоре, где вся логика, не может найтись. Метод init выглядит так:
@Override public void init(ProcessorContext processorContext) {
super.init(processorContext);
this.keyValueStore = (KeyValueStore) context().getStateStore(stateStoreName);
}
Но context().getStateStore(stateStoreName); возвращает null.
Топология такая
topology.addGlobalStore(storeKeyBuilder, transactionSourceNodeName, stringDeserializer, transactionDeserializer,
inTopic, fakeProcessorNodeName, transactionSupplier)
.addProcessor(transactionProcessorNodeName, () -> new TransactionProcessor(transactionsStateStore, bonus, bonusNum),
fakeProcessorNodeName)
.addStateStore(storeBuilder, transactionProcessorNodeName)
.addSink(transactionSinkNodeName, "analytics", stringSerializer, transactionPerformanceSerializer, transactionProcessorNodeName);
Кончено, я подумал, что виной всему наличие глобального хранилища. Но однозначных подтверждений в интернете не нашел. Да и какой смысл в промежуточном глобальном хранилище, если больше никаким нельзя будет пользоваться. Я смотрел внимательно свой код, но нигде не смог найти подвох.
Вопрос: неужели нельзя сделать два хранилища для двух нод? Или я что-то не так делаю