Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support the ability to deploy a Kafka proxy #407

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions api/v1/ytsaurus_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,14 @@ type TCPProxiesSpec struct {
Role string `json:"role,omitempty"`
}

type KafkaProxiesSpec struct {
InstanceSpec `json:",inline"`
ServiceType *corev1.ServiceType `json:"serviceType,omitempty"`
//+kubebuilder:default:=default
//+kubebuilder:validation:MinLength:=1
Role string `json:"role,omitempty"`
}

// ClusterNodesSpec is a common part of spec for nodes of all flavors.
type ClusterNodesSpec struct {
// List of the node tags.
Expand Down Expand Up @@ -661,9 +669,10 @@ type YtsaurusSpec struct {
//+optional
MasterCaches *MasterCachesSpec `json:"masterCaches,omitempty"`
// +kubebuilder:validation:MinItems:=1
HTTPProxies []HTTPProxiesSpec `json:"httpProxies,omitempty"`
RPCProxies []RPCProxiesSpec `json:"rpcProxies,omitempty"`
TCPProxies []TCPProxiesSpec `json:"tcpProxies,omitempty"`
HTTPProxies []HTTPProxiesSpec `json:"httpProxies,omitempty"`
RPCProxies []RPCProxiesSpec `json:"rpcProxies,omitempty"`
TCPProxies []TCPProxiesSpec `json:"tcpProxies,omitempty"`
KafkaProxies []KafkaProxiesSpec `json:"kafkaProxies,omitempty"`
// +kubebuilder:validation:MinItems:=1
DataNodes []DataNodesSpec `json:"dataNodes,omitempty"`
ExecNodes []ExecNodesSpec `json:"execNodes,omitempty"`
Expand Down
2,424 changes: 2,424 additions & 0 deletions config/crd/bases/cluster.ytsaurus.tech_ytsaurus.yaml

Large diffs are not rendered by default.

171 changes: 171 additions & 0 deletions config/samples/cluster_v1_local_with_kafka.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
apiVersion: cluster.ytsaurus.tech/v1
kind: Ytsaurus
metadata:
name: minisaurus
spec:
coreImage: ghcr.io/ytsaurus/ytsaurus-nightly:dev-24.2-2024-12-06-ee9e61d820c81624e2562edca351e9bc8bdd7e2e

discovery:
instanceCount: 1

# Make reusable loggers config with yaml anchor.
loggers: &loggers
- name: debug
compression: zstd
minLogLevel: debug
writerType: file
rotationPolicy: &rotationPolicy
maxTotalSizeToKeep: 10000000
rotationPeriodMilliseconds: 900000
categoriesFilter:
type: exclude
values: [ "Bus", "Concurrency" ]
- name: info
minLogLevel: info
writerType: file
rotationPolicy: *rotationPolicy
- name: error
minLogLevel: error
writerType: stderr

kafkaProxies:
- serviceType: NodePort
loggers: *loggers
instanceCount: 1
role: default
image: ghcr.io/ytsaurus/ytsaurus-nightly:dev-24.2-2024-12-26-d79040f9b4037aabf1f6b686e442cc029679a285

primaryMasters:
instanceCount: 1
cellTag: 1
loggers: *loggers
locations:
- locationType: MasterChangelogs
path: /yt/master-data/master-changelogs
- locationType: MasterSnapshots
path: /yt/master-data/master-snapshots

volumeMounts:
- name: master-data
mountPath: /yt/master-data

volumeClaimTemplates:
- metadata:
name: master-data
spec:
accessModes: [ "ReadWriteOnce" ]
resources:
requests:
storage: 5Gi

httpProxies:
- serviceType: NodePort
loggers: *loggers
instanceCount: 1
role: default
- serviceType: NodePort
loggers: *loggers
instanceCount: 1
role: control

rpcProxies:
- instanceCount: 1
loggers: *loggers
role: default
- instanceCount: 1
loggers: *loggers
role: heavy

dataNodes:
- instanceCount: 3
loggers: *loggers

volumeMounts:
- name: node-data
mountPath: /yt/node-data

locations:
- locationType: ChunkStore
path: /yt/node-data/chunk-store

volumeClaimTemplates:
- metadata:
name: node-data
spec:
accessModes: [ "ReadWriteOnce" ]
resources:
requests:
storage: 5Gi

execNodes:
- instanceCount: 1
loggers: *loggers
resources:
limits:
cpu: 1
memory: 2Gi

volumeMounts:
- name: node-data
mountPath: /yt/node-data

volumes:
- name: node-data
emptyDir:
sizeLimit: 5Gi

locations:
- locationType: ChunkCache
path: /yt/node-data/chunk-cache
- locationType: Slots
path: /yt/node-data/slots

jobProxyLoggers:
- name: debug
compression: zstd
minLogLevel: debug
writerType: file
useTimestampSuffix: true
rotationPolicy: &rotationPolicy
maxTotalSizeToKeep: 10000000
rotationPeriodMilliseconds: 900000
categoriesFilter:
type: exclude
values: [ "Bus", "Concurrency" ]
- name: info
minLogLevel: info
writerType: file
rotationPolicy: *rotationPolicy
- name: error
minLogLevel: error
writerType: stderr

schedulers:
instanceCount: 1
loggers: *loggers

controllerAgents:
instanceCount: 1
loggers: *loggers

ui:
image: ghcr.io/ytsaurus/ui:stable
serviceType: NodePort
instanceCount: 1

strawberry:
resources:
limits:
memory: 100Mi
image: ghcr.io/ytsaurus/strawberry:0.0.11

tabletNodes:
- instanceCount: 1

yqlAgents:
instanceCount: 1
image: ghcr.io/ytsaurus/query-tracker:0.0.6

queryTrackers:
instanceCount: 1
image: ghcr.io/ytsaurus/query-tracker:0.0.6
8 changes: 8 additions & 0 deletions controllers/component_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ func NewComponentManager(
allComponents = append(allComponents, tps...)
}

if len(resource.Spec.KafkaProxies) > 0 {
var kps []components.Component
for _, kpSpec := range ytsaurus.GetResource().Spec.KafkaProxies {
kps = append(kps, components.NewKafkaProxy(cfgen, ytsaurus, m, kpSpec))
}
allComponents = append(allComponents, kps...)
}

var ends []components.Component
if len(resource.Spec.ExecNodes) > 0 {
for _, endSpec := range ytsaurus.GetResource().Spec.ExecNodes {
Expand Down
49 changes: 49 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,7 @@ _Appears in:_
- [ExecNodesSpec](#execnodesspec)
- [HTTPProxiesSpec](#httpproxiesspec)
- [InstanceSpec](#instancespec)
- [KafkaProxiesSpec](#kafkaproxiesspec)
- [MasterCachesSpec](#mastercachesspec)
- [MastersSpec](#mastersspec)
- [QueryTrackerSpec](#querytrackerspec)
Expand Down Expand Up @@ -645,6 +646,7 @@ _Appears in:_
- [ExecNodesSpec](#execnodesspec)
- [HTTPProxiesSpec](#httpproxiesspec)
- [InstanceSpec](#instancespec)
- [KafkaProxiesSpec](#kafkaproxiesspec)
- [MasterCachesSpec](#mastercachesspec)
- [MastersSpec](#mastersspec)
- [QueryTrackerSpec](#querytrackerspec)
Expand Down Expand Up @@ -682,6 +684,7 @@ _Appears in:_
- [DiscoverySpec](#discoveryspec)
- [ExecNodesSpec](#execnodesspec)
- [HTTPProxiesSpec](#httpproxiesspec)
- [KafkaProxiesSpec](#kafkaproxiesspec)
- [MasterCachesSpec](#mastercachesspec)
- [MastersSpec](#mastersspec)
- [QueryTrackerSpec](#querytrackerspec)
Expand Down Expand Up @@ -745,6 +748,47 @@ _Appears in:_
| `doNotSetUserId` _boolean_ | Do not use slot user id for running jobs. | | |


#### KafkaProxiesSpec







_Appears in:_
- [YtsaurusSpec](#ytsaurusspec)

| Field | Description | Default | Validation |
| --- | --- | --- | --- |
| `image` _string_ | Overrides coreImage for component. | | |
| `entrypointWrapper` _string array_ | Specifies wrapper for component container command. | | |
| `volumes` _[Volume](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#volume-v1-core) array_ | | | |
| `volumeMounts` _[VolumeMount](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#volumemount-v1-core) array_ | | | |
| `readinessProbeParams` _[HealthcheckProbeParams](#healthcheckprobeparams)_ | | | |
| `resources` _[ResourceRequirements](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#resourcerequirements-v1-core)_ | | | |
| `instanceCount` _integer_ | | | |
| `minReadyInstanceCount` _integer_ | | | |
| `locations` _[LocationSpec](#locationspec) array_ | | | |
| `volumeClaimTemplates` _[EmbeddedPersistentVolumeClaim](#embeddedpersistentvolumeclaim) array_ | | | |
| `runtimeClassName` _string_ | | | |
| `enableAntiAffinity` _boolean_ | Deprecated: use Affinity.PodAntiAffinity instead. | | |
| `hostNetwork` _boolean_ | Use the host's network namespace, this overrides global option. | | |
| `monitoringPort` _integer_ | | | |
| `loggers` _[TextLoggerSpec](#textloggerspec) array_ | | | |
| `structuredLoggers` _[StructuredLoggerSpec](#structuredloggerspec) array_ | | | |
| `affinity` _[Affinity](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#affinity-v1-core)_ | | | |
| `nodeSelector` _object (keys:string, values:string)_ | | | |
| `tolerations` _[Toleration](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#toleration-v1-core) array_ | | | |
| `podLabels` _object (keys:string, values:string)_ | | | |
| `podAnnotations` _object (keys:string, values:string)_ | | | |
| `setHostnameAsFqdn` _boolean_ | SetHostnameAsFQDN indicates whether to set the hostname as FQDN. | true | |
| `terminationGracePeriodSeconds` _integer_ | Optional duration in seconds the pod needs to terminate gracefully. | | |
| `nativeTransport` _[RPCTransportSpec](#rpctransportspec)_ | Component config for native RPC bus transport. | | |
| `serviceType` _[ServiceType](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#servicetype-v1-core)_ | | | |
| `role` _string_ | | default | MinLength: 1 <br /> |


#### LocationSpec


Expand All @@ -760,6 +804,7 @@ _Appears in:_
- [ExecNodesSpec](#execnodesspec)
- [HTTPProxiesSpec](#httpproxiesspec)
- [InstanceSpec](#instancespec)
- [KafkaProxiesSpec](#kafkaproxiesspec)
- [MasterCachesSpec](#mastercachesspec)
- [MastersSpec](#mastersspec)
- [QueryTrackerSpec](#querytrackerspec)
Expand Down Expand Up @@ -1222,6 +1267,7 @@ _Appears in:_
- [ExecNodesSpec](#execnodesspec)
- [HTTPProxiesSpec](#httpproxiesspec)
- [InstanceSpec](#instancespec)
- [KafkaProxiesSpec](#kafkaproxiesspec)
- [MasterCachesSpec](#mastercachesspec)
- [MastersSpec](#mastersspec)
- [QueryTrackerSpec](#querytrackerspec)
Expand Down Expand Up @@ -1816,6 +1862,7 @@ _Appears in:_
- [ExecNodesSpec](#execnodesspec)
- [HTTPProxiesSpec](#httpproxiesspec)
- [InstanceSpec](#instancespec)
- [KafkaProxiesSpec](#kafkaproxiesspec)
- [MasterCachesSpec](#mastercachesspec)
- [MastersSpec](#mastersspec)
- [QueryTrackerSpec](#querytrackerspec)
Expand Down Expand Up @@ -1959,6 +2006,7 @@ _Appears in:_
- [ExecNodesSpec](#execnodesspec)
- [HTTPProxiesSpec](#httpproxiesspec)
- [InstanceSpec](#instancespec)
- [KafkaProxiesSpec](#kafkaproxiesspec)
- [MasterCachesSpec](#mastercachesspec)
- [MastersSpec](#mastersspec)
- [QueryTrackerSpec](#querytrackerspec)
Expand Down Expand Up @@ -2215,6 +2263,7 @@ _Appears in:_
| `httpProxies` _[HTTPProxiesSpec](#httpproxiesspec) array_ | | | MinItems: 1 <br /> |
| `rpcProxies` _[RPCProxiesSpec](#rpcproxiesspec) array_ | | | |
| `tcpProxies` _[TCPProxiesSpec](#tcpproxiesspec) array_ | | | |
| `kafkaProxies` _[KafkaProxiesSpec](#kafkaproxiesspec) array_ | | | |
| `dataNodes` _[DataNodesSpec](#datanodesspec) array_ | | | MinItems: 1 <br /> |
| `execNodes` _[ExecNodesSpec](#execnodesspec) array_ | | | |
| `schedulers` _[SchedulersSpec](#schedulersspec)_ | | | |
Expand Down
Loading
Loading