Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The client is not authorized to access this topic #2974

Open
aethir-paas opened this issue Aug 27, 2024 · 1 comment
Open

The client is not authorized to access this topic #2974

aethir-paas opened this issue Aug 27, 2024 · 1 comment

Comments

@aethir-paas
Copy link

aethir-paas commented Aug 27, 2024

Description

When I use SASL OAUTHBEARER authentication, is there an internal mechanism to automatically refresh the token? Currently, my service encounters the error: "The client is not authorized to access this topic."

Versions
Sarama Kafka Go
v1.43.2 v3.5.1 V1.19
Configuration
type accessTokenProvider struct {
	awsRegion   string
	awsProfiles string
	token       string
	mu          sync.RWMutex
}

func (m *accessTokenProvider) Token() (*sarama.AccessToken, error) {
	m.mu.RLock()
	defer m.mu.RUnlock()
	var err error
	if m.awsProfiles == "" {
		m.token, _, err = signer.GenerateAuthToken(context.Background(), m.awsRegion)
		return &sarama.AccessToken{Token: m.token}, err
	} else {
		m.token, _, err = signer.GenerateAuthTokenFromProfile(context.Background(), m.awsRegion, m.awsProfiles)
		return &sarama.AccessToken{Token: m.token}, err
	}
}

func (m *accessTokenProvider) RefreshToken() {
	for {
		var token string
		var err error
		if m.awsProfiles == "" {
			token, _, err = signer.GenerateAuthToken(context.Background(), m.awsRegion)
		} else {
			token, _, err = signer.GenerateAuthTokenFromProfile(context.Background(), m.awsRegion, m.awsProfiles)
		}
		if err != nil {
			fmt.Println("RefreshToken error :", err)
		}

		m.mu.Lock()
		m.token = token
		m.mu.Unlock()

		// 等待令牌到期前的一段时间再刷新
		time.Sleep(10 * time.Minute)
	}
}

func InitConfig(awsRegion string, awsProfiles string, SASL, TLS bool) *sarama.Config {
	/*err := godotenv.Load()
	if err != nil {
		log.Fatal("Error loading .env file")
	}
	*/
	/*awsRegion, hasEnvRegion := os.LookupEnv(awsRegion)
	if !hasEnvRegion {
		log.Fatal("AWS_REGION environment variable not set")
	}*/

	configure := sarama.NewConfig()
	if SASL {
		configure.Net.SASL.Enable = true
		configure.Net.SASL.Mechanism = sarama.SASLTypeOAuth

		tokenProvider := &accessTokenProvider{
			awsRegion:   awsRegion,
			awsProfiles: awsProfiles,
		}
		configure.Net.SASL.TokenProvider = tokenProvider
		go func() {
			tokenProvider.RefreshToken()
		}()
	}

	if TLS {
		configure.Net.TLS.Enable = true
		configure.Net.TLS.Config = &tls.Config{}
	}
	configure.Consumer.Offsets.Initial = sarama.OffsetOldest
	return configure
}
Logs
The client is not authorized to access this topic

2024-08-26 20:00:00.528033078 +0000 UTC m=+43107.359917041 kafka: Failed to produce message to topic container_and_session_event: kafka server: The client is not authorized to access this topic

2024-08-26 20:00:01.528134796 +0000 UTC m=+43108.360018739 kafka: Failed to produce message to topic container_and_session_event: kafka server: The client is not authorized to access this topic

2024-08-26 20:00:02.867247119 +0000 UTC m=+43109.699131083 kafka: Failed to produce message to topic container_and_session_event: kafka server: The client is not authorized to access this topic

2024-08-26 20:00:05.02019884 +0000 UTC m=+43111.852082706 kafka: Failed to produce message to topic container_and_session_event: kafka server: The client is not authorized to access this topic

Additional Context

I reviewed the relevant documentation, and it seems that when the producer sends a message, it triggers authenticateViaSASLv1, which retrieves the latest token through an interface class. However, this behavior is not as I expected. Currently, the authentication fails periodically after some time. I’m not sure what internal mechanism could be used to refresh the token automatically.

@JunliWang
Copy link
Member

JunliWang commented Sep 28, 2024

Broker has per listener config that forces clients to re-authenticate:
https://kafka.apache.org/documentation/#brokerconfigs_connections.max.reauth.ms
By default it is off.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants
@JunliWang @aethir-paas and others