跳到主內容

producer_comsumer範例

gradle

	// kafka dependency
	// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
	implementation("org.springframework.kafka:spring-kafka")
	// https://mvnrepository.com/artifact/org.slf4j/slf4j-api
	implementation("org.slf4j:slf4j-api:2.0.5")
	// https://mvnrepository.com/artifact/org.slf4j/slf4j-simple
	implementation("org.slf4j:slf4j-simple:2.0.5")
	testImplementation("org.junit.jupiter:junit-jupiter-api:5.9.2")
	testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine")

properties

kafka.listen.start:true
spring.application.name=kafka_demo
spring.kafka.bootstrap-servers=10.20.30.40:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.properties.retries=10
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.group-id=group1

producer


@Controller
@RequestMapping("/api")
public class ProducerController {


    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @GetMapping("/kafka")
    public ResponseEntity<String> send() throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            kafkaTemplate.send("first","data"+i,"data"+i);
//            Thread.sleep(1000);
        }
        return ResponseEntity.ok("{success:true}");
    }

}

consumer

package com.momo.appteam.liveapi.listener;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;

@Component
public class LiveStreamKafkaListener {

    @KafkaListener(topics = "first", autoStartup = "${kafka.listen.start:false}")
    public void consumeLiveStream(ConsumerRecord record)
    { // 参数: 收到的 value
//        System.out.println("收到的信息: " + record.toString());
        Object topic = record.topic();
        Object key = record.key();
        Object value = record.value();
        System.out.println("收到的信息: 【topic】" + topic + "【key】" + key + "【value】" + value );
        //ConsumerRecord(
        // topic = momoLiveStream, partition = 0,
        // leaderEpoch = 0, offset = 760,
        // CreateTime = 1697695508640,
        // serialized key size = 6,
        // serialized value size = 9,
        // headers = RecordHeaders(headers = [], isReadOnly = false),
        // key = liveId, value = afasffasf
        // )
    }
    @KafkaListener(
            groupId = "default",
            autoStartup = "${kafka.listen.start:false}" ,
            topicPartitions = {
                @TopicPartition(
                        topic = "first",

                        partitionOffsets = {
                            @PartitionOffset(partition = "0",initialOffset = "0")
                        }
                )
    })
    public void consumeAllLiveStream(ConsumerRecord record){
        Object topic = record.topic();
        Object key = record.key();
        Object value = record.value();
        System.out.println("收到的信息: 【topic】" + topic + "【key】" + key + "【value】" + value );
    }

}