diff --git a/trading-bot/src/main/java/ru/ct/belfort/db/IdeasRepository.java b/trading-bot/src/main/java/ru/ct/belfort/db/IdeasRepository.java index 97cf27a..0b7fb19 100644 --- a/trading-bot/src/main/java/ru/ct/belfort/db/IdeasRepository.java +++ b/trading-bot/src/main/java/ru/ct/belfort/db/IdeasRepository.java @@ -24,25 +24,24 @@ public IdeasRepository(DataSource dataSource) { public void insert(IdeaDTO idea) { jdbcTemplate.update(""" - INSERT INTO ideas(score, time) VALUES(?, CURRENT_TIMESTAMP(0)) + INSERT INTO ideas(score, time) VALUES(?, CURRENT_TIMESTAMP) """, idea.score()); - debugOutput(); } public List selectAll() { return jdbcTemplate.query("SELECT * FROM ideas", mapper); } - public void deleteAll() { - jdbcTemplate.update("DELETE FROM ideas"); + public Integer getRecordsAmount() { + return jdbcTemplate.queryForObject("SELECT COUNT(*) FROM ideas", Integer.class); } - // TODO: Remove this when db tests appear - private void debugOutput() { - log.info("Inserted"); - for (IdeaEntity it : selectAll()) { - log.info(it.toString()); - } + public IdeaEntity getLastRecord() { + return jdbcTemplate.query("SELECT * FROM ideas ORDER BY time DESC LIMIT 1", mapper).get(0); + } + + public void deleteAll() { + jdbcTemplate.update("DELETE FROM ideas"); } private static class IdeaEntityMapper implements RowMapper { diff --git a/trading-bot/src/test/java/ru/ct/belfort/Utils.java b/trading-bot/src/test/java/ru/ct/belfort/Utils.java index f91be68..aa37f71 100644 --- a/trading-bot/src/test/java/ru/ct/belfort/Utils.java +++ b/trading-bot/src/test/java/ru/ct/belfort/Utils.java @@ -39,4 +39,32 @@ public static TradingInfoDTO genRandomTradingInfoDTO(String strategy) { var candles = genRandomCandles(10, 5, 10); return new TradingInfoDTO(candles, strategy); } + + public static TradingInfoDTO badCandlesSample() { + return new TradingInfoDTO( + closePricesToCandles(new double[]{500, 493, 491, 485, 483, 482, 480, 467, 463, 461}), + "rsi" + ); + } + + public static TradingInfoDTO goodCandlesSample() { + return new TradingInfoDTO( + closePricesToCandles(new double[]{500, 505, 517, 523, 524, 525, 536, 541, 547, 555}), + "rsi" + ); + } + + public static TradingInfoDTO okCandlesSample() { + return new TradingInfoDTO( + closePricesToCandles(new double[]{500, 500, 500, 500, 500, 500, 500, 500, 500, 500}), + "rsi" + ); + } + + public static TradingInfoDTO unknownStrategySample() { + return new TradingInfoDTO( + closePricesToCandles(new double[]{}), + "fds7f757das78f7sd" + ); + } } diff --git a/trading-bot/src/test/java/ru/ct/belfort/base/KafkaAndPostgresTestBase.java b/trading-bot/src/test/java/ru/ct/belfort/base/KafkaAndPostgresTestBase.java new file mode 100644 index 0000000..b294c8e --- /dev/null +++ b/trading-bot/src/test/java/ru/ct/belfort/base/KafkaAndPostgresTestBase.java @@ -0,0 +1,93 @@ +package ru.ct.belfort.base; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.BeforeAll; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; +import ru.ct.belfort.IdeaDTO; +import ru.ct.belfort.TradingInfoDTO; +import ru.ct.belfort.kafka.producers.ErrorProducerConfig; +import ru.ct.belfort.kafka.producers.IdeasProducerConfig; + +import java.util.Collections; +import java.util.Map; + +@SpringBootTest +@Testcontainers +@DirtiesContext +public abstract class KafkaAndPostgresTestBase { + + @Container + protected static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); + + @Container + protected static PostgreSQLContainer postgres = new PostgreSQLContainer<>("postgres"); + + @DynamicPropertySource + static void kafkaProperties(DynamicPropertyRegistry registry) { + registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers); + registry.add("spring.datasource.drivername", postgres::getDriverClassName); + registry.add("spring.datasource.url", postgres::getJdbcUrl); + registry.add("spring.datasource.username", postgres::getUsername); + registry.add("spring.datasource.password", postgres::getPassword); + } + + protected static KafkaProducer producer = null; + protected static KafkaConsumer ideasConsumer = null; + protected static KafkaConsumer errorConsumer = null; + + @BeforeAll + public static void init() { + + JsonDeserializer jsonDeserializer = new JsonDeserializer<>(); + jsonDeserializer.addTrustedPackages("*"); + + producer = new KafkaProducer<>( + Map.of( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(), + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class, + ProducerConfig.CLIENT_ID_CONFIG, "candles_test_producer" + ), + new StringSerializer(), + new JsonSerializer<>() + ); + + ideasConsumer = new KafkaConsumer<>( + Map.of( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(), + ConsumerConfig.GROUP_ID_CONFIG, "ideas_test_consumers", + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest" + ), + new StringDeserializer(), + jsonDeserializer + ); + + errorConsumer = new KafkaConsumer<>( + Map.of( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(), + ConsumerConfig.GROUP_ID_CONFIG, "error_test_consumers", + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest" + ), + new StringDeserializer(), + new StringDeserializer() + ); + + ideasConsumer.subscribe(Collections.singletonList(IdeasProducerConfig.TOPIC)); + errorConsumer.subscribe(Collections.singletonList(ErrorProducerConfig.TOPIC)); + } +} diff --git a/trading-bot/src/test/java/ru/ct/belfort/db/PostgresTest.java b/trading-bot/src/test/java/ru/ct/belfort/db/PostgresTest.java new file mode 100644 index 0000000..ea6a660 --- /dev/null +++ b/trading-bot/src/test/java/ru/ct/belfort/db/PostgresTest.java @@ -0,0 +1,65 @@ +package ru.ct.belfort.db; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.testcontainers.junit.jupiter.Testcontainers; +import ru.ct.belfort.TradingInfoDTO; +import ru.ct.belfort.base.KafkaAndPostgresTestBase; +import ru.ct.belfort.kafka.consumers.CandlesConsumerConfig; +import ru.ct.belfort.strategy.RsiStrategy; + +import java.time.Duration; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static ru.ct.belfort.Utils.*; + +@Testcontainers +@SpringBootTest +public class PostgresTest extends KafkaAndPostgresTestBase { + + @Autowired + IdeasRepository repository; + + private static final long MAX_BLOCK_TIME = 1000; + + private static void sendAndWait(TradingInfoDTO message) { + producer.send(new ProducerRecord<>(CandlesConsumerConfig.TOPIC, message)); + ideasConsumer.poll(Duration.ofMillis(MAX_BLOCK_TIME)); + } + + @Test + public void sendBadCandles_ExpectDBInsertion() { + final var message = badCandlesSample(); + var before = repository.getRecordsAmount(); + sendAndWait(message); + assertEquals(before + 1, repository.getRecordsAmount()); + double score = new RsiStrategy().predict(message.candles()); + assertEquals(repository.getLastRecord().score(), score); + } + + @Test + public void sendGoodCandles_ExpectDBInsertion() { + final var message = goodCandlesSample(); + var before = repository.getRecordsAmount(); + sendAndWait(message); + assertEquals(before + 1, repository.getRecordsAmount()); + double score = new RsiStrategy().predict(message.candles()); + assertEquals(repository.getLastRecord().score(), score); + } + + @Test + public void sendOkCandles_ExpectNoDBInsertion() { + var before = repository.getRecordsAmount(); + sendAndWait(okCandlesSample()); + assertEquals(before, repository.getRecordsAmount()); + } + + @Test + public void sendUnknownStrategy_ExpectNoDBInsertion() { + var before = repository.getRecordsAmount(); + sendAndWait(unknownStrategySample()); + assertEquals(before, repository.getRecordsAmount()); + } +} diff --git a/trading-bot/src/test/java/ru/ct/belfort/kafka/KafkaTest.java b/trading-bot/src/test/java/ru/ct/belfort/kafka/KafkaTest.java index 2d82087..9bee00a 100644 --- a/trading-bot/src/test/java/ru/ct/belfort/kafka/KafkaTest.java +++ b/trading-bot/src/test/java/ru/ct/belfort/kafka/KafkaTest.java @@ -1,108 +1,34 @@ package ru.ct.belfort.kafka; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.kafka.support.serializer.JsonDeserializer; -import org.springframework.kafka.support.serializer.JsonSerializer; -import org.springframework.test.context.DynamicPropertyRegistry; -import org.springframework.test.context.DynamicPropertySource; -import org.testcontainers.containers.KafkaContainer; -import org.testcontainers.containers.PostgreSQLContainer; -import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.utility.DockerImageName; import ru.ct.belfort.Advice; import ru.ct.belfort.IdeaDTO; -import ru.ct.belfort.TradingInfoDTO; +import ru.ct.belfort.base.KafkaAndPostgresTestBase; import ru.ct.belfort.kafka.consumers.CandlesConsumerConfig; import ru.ct.belfort.kafka.producers.ErrorProducerConfig; import ru.ct.belfort.kafka.producers.IdeasProducerConfig; import ru.ct.belfort.strategy.RsiStrategy; import java.time.Duration; -import java.util.Collections; -import java.util.Map; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.tuple; -import static ru.ct.belfort.Utils.closePricesToCandles; +import static ru.ct.belfort.Utils.*; @Testcontainers @SpringBootTest -public class KafkaTest { +public class KafkaTest extends KafkaAndPostgresTestBase { - @Container - static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); + private static final long MAX_BLOCK_TIME = 1000; - @Container - static PostgreSQLContainer postgres = new PostgreSQLContainer<>("postgres"); - - @DynamicPropertySource - static void kafkaProperties(DynamicPropertyRegistry registry) { - registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers); - registry.add("spring.datasource.drivername", postgres::getDriverClassName); - registry.add("spring.datasource.url", postgres::getJdbcUrl); - registry.add("spring.datasource.username", postgres::getUsername); - registry.add("spring.datasource.password", postgres::getPassword); - } - - static KafkaProducer producer = null; - static KafkaConsumer ideasConsumer = null; - static KafkaConsumer errorConsumer = null; - - @BeforeAll - public static void init() { - - JsonDeserializer jsonDeserializer = new JsonDeserializer<>(); - jsonDeserializer.addTrustedPackages("*"); - - producer = new KafkaProducer<>( - Map.of( - ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(), - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class, - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class, - ProducerConfig.CLIENT_ID_CONFIG, "candles_test_producer" - ), - new StringSerializer(), - new JsonSerializer<>() - ); - - ideasConsumer = new KafkaConsumer<>( - Map.of( - ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(), - ConsumerConfig.GROUP_ID_CONFIG, "ideas_test_consumers", - ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest" - ), - new StringDeserializer(), - jsonDeserializer - ); - - errorConsumer = new KafkaConsumer<>( - Map.of( - ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(), - ConsumerConfig.GROUP_ID_CONFIG, "error_test_consumers", - ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest" - ), - new StringDeserializer(), - new StringDeserializer() - ); - - ideasConsumer.subscribe(Collections.singletonList(IdeasProducerConfig.TOPIC)); - errorConsumer.subscribe(Collections.singletonList(ErrorProducerConfig.TOPIC)); - } - - public void expectMessage(KafkaConsumer consumer, String topic, V message) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); + public static void expectMessage(KafkaConsumer consumer, String topic, V message) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(MAX_BLOCK_TIME)); assertThat(records) .isNotEmpty() .hasSize(1) @@ -110,18 +36,15 @@ public void expectMessage(KafkaConsumer consumer, String topic, V .containsExactly(tuple(topic, message)); } - public void expectNoMessage(KafkaConsumer consumer) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); + public static void expectNoMessage(KafkaConsumer consumer) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(MAX_BLOCK_TIME)); assertThat(records) .isEmpty(); } @Test public void sendBadCandles_ExpectBuyIdea() { - final var message = new TradingInfoDTO( - closePricesToCandles(new double[]{500, 493, 491, 485, 483, 482, 480, 467, 463, 461}), - "rsi" - ); + final var message = badCandlesSample(); producer.send(new ProducerRecord<>(CandlesConsumerConfig.TOPIC, message)); double score = new RsiStrategy().predict(message.candles()); expectMessage(ideasConsumer, IdeasProducerConfig.TOPIC, new IdeaDTO(score, Advice.BUY)); @@ -130,10 +53,7 @@ public void sendBadCandles_ExpectBuyIdea() { @Test public void sendGoodCandles_ExpectSellIdea() { - final var message = new TradingInfoDTO( - closePricesToCandles(new double[]{500, 505, 517, 523, 524, 525, 536, 541, 547, 555}), - "rsi" - ); + final var message = goodCandlesSample(); producer.send(new ProducerRecord<>(CandlesConsumerConfig.TOPIC, message)); double score = new RsiStrategy().predict(message.candles()); expectMessage(ideasConsumer, IdeasProducerConfig.TOPIC, new IdeaDTO(score, Advice.SELL)); @@ -142,22 +62,14 @@ public void sendGoodCandles_ExpectSellIdea() { @Test public void sendOkCandles_ExpectNoIdea() { - final var message = new TradingInfoDTO( - closePricesToCandles(new double[]{500, 500, 500, 500, 500, 500, 500, 500, 500, 500}), - "rsi" - ); - producer.send(new ProducerRecord<>(CandlesConsumerConfig.TOPIC, message)); + producer.send(new ProducerRecord<>(CandlesConsumerConfig.TOPIC, okCandlesSample())); expectNoMessage(ideasConsumer); expectNoMessage(errorConsumer); } @Test public void sendUnknownStrategy_ExpectError() { - final var message = new TradingInfoDTO( - closePricesToCandles(new double[]{}), - "fds7f757das78f7sd" - ); - producer.send(new ProducerRecord<>(CandlesConsumerConfig.TOPIC, message)); + producer.send(new ProducerRecord<>(CandlesConsumerConfig.TOPIC, unknownStrategySample())); expectMessage(errorConsumer, ErrorProducerConfig.TOPIC, "Unknown strategy"); expectNoMessage(ideasConsumer); }