Recently, there was announcememnt of CVE-2021-3156 - a linux sudo security issue, which allows local user to gain root privileges. https://cve.mitre.org/cgi-bin/cvename.cgi?name=2021-3156
This is very unpleasant and security fix was published recently.
sudo dpkg -l | grep sudo
according to debian security tracker: https://security-tracker.debian.org/tracker/CVE-2021-3156 for Debian 10 buster, this was fixed in sudo version 1.8.27-1+deb10u3 so, make sure that the version of sudo you use is higher than that version.
If you're running Debian based system, it's very important for you to keep your systems updated on regular basis. I order to do that and apply this to all your system, you should create a cron auto update job:
sudo cat <<EOF >> /etc/cron.daily/update
#!/bin/bash
apt-get update
apt-get upgrade -y
apt-get autoclean
EOF
sudo chmod 755 /etc/cron.daily/update
sudo service cron restart
]]>
After several attempts how to do it properly, I've created this method, which works well and returns latest x records from kafka topic you specified.
private Iterable<ConsumerRecord<String, T>> getLatestConsumerRecordsWithLimit(KafkaConsumer<String, T> consumer, String topic, long limit) {
Iterable<ConsumerRecord<String, T>> records;
synchronized (consumer) {
consumer.subscribe(Collections.singletonList(topic));
consumer.poll(0);
consumer.seekToEnd(consumer.assignment());
for (TopicPartition topicPartition : consumer.assignment()) {
long actualPosition = consumer.position(topicPartition);
long newPosition = actualPosition - limit;
if (newPosition >= 0) {
consumer.seek(topicPartition, newPosition);
} else {
consumer.seekToBeginning(consumer.assignment());
}
}
ConsumerRecords<String, T> poll = consumer.poll(TIMEOUT_MS);
records = poll.records(topic);
consumer.unsubscribe();
}
return records;
}
Let's go through several blocks
Iterable<ConsumerRecord<String, T>> getLatestConsumerRecordsWithLimit(KafkaConsumer<String, T> consumer, String topic, long limit)
T represents any generic type, so it can be for example used like:
getLatestConsumerRecordsWithLimit(KafkaConsumer<String, Person> consumer, String topic, long limit)
or
getLatestConsumerRecordsWithLimit(KafkaConsumer<String, Employee> consumer, String topic, long limit)
synchronized (consumer) {
Kafka consumer is not naturally thread-safe. If there would be multiple threads accessing the method at the same time, the result can be unpredictable and kafka itself throws exception when detecting that. Therefore we need to perform synchronization on object consumer, which would allow only one thread entering the synchronized method at the same time.
consumer.poll(0);
consumer.seekToEnd(consumer.assignment());
we need to set pointer in kafka topic to the very end to determine total number of records.
long actualPosition = consumer.position(topicPartition);
long newPosition = actualPosition - limit;
then, getting the actual position at the very end. We want the new position to be the one at the end minus number of our records we want to return.
if (newPosition >= 0) {
consumer.seek(topicPartition, newPosition);
} else {
consumer.seekToBeginning(consumer.assignment());
}
it can happen, that you have smaller number of records in the topic than the actual limit, in that case, we'll set our pointer to the very beginning
ConsumerRecords<String, T> poll = consumer.poll(TIMEOUT_MS);
records = poll.records(topic);
consumer.unsubscribe();
and finally, getting the records within the timeout and unsubscribing.
In order to use this method, you'd need to inject the kafka consumer in your spring boot service:
private final KafkaConsumer<String, Person> kafkaConsumer;
Then, within some other method in your service:
Iterable<ConsumerRecord<String, Person>> records = getLatestConsumerRecordsWithLimit(kafkaConsumer, "person_topic", key, limit)
List<Person> = StreamSupport.stream(Spliterators.spliteratorUnknownSize(records.iterator(), Spliterator.ORDERED), false)
.filter(e -> key.equals(e.key()))
.map(e -> e.value())
.collect(Collectors.toList())
.filter(e -> key.equals(e.key()))
we need to make sure to return only values which match key of the produced record. You shall always produce a record to kafka topic, which has specific key, so that it's assigned to correct partition based on that key.
]]>This article is for users, who are familiar with Kubernetes cluster, its basic principles and Kafka and solves a specific data persistence problem related to running a "Confluent Platform" in Kubernetes cluster. It took me several hours to study the priciples and come up with this solution which works great. It will hopefully help other users having the same use case.
Image Source: https://cloud.ibm.com/docs/openshift?topic=openshift-kube_concepts&locale=en
If you wanted to set up Kafka on Kubernetes cluster together with ksqlDB, you must have found a neat helm chart, which eases the deployment of this platform on your Kubernetes cluster from Confluent Platform.
This is the URL of the Confluent Platform helm chart:
By default, it starts 3 instances of Kafka, Zookeeper, ksql server, Kafka REST endpoints, Kafka connect and Schema Registry. You can use it right away and it's pre-configured the way that they all see each other and act as one platform.
When you uninstall the helm chart and pods don't exist anymore, data which were stored in Kafka instances are gone. To prevent that, you should set in values.yaml:
persistence:
enabled: true
This would create a persistence volume claims (PVC), which would lookup a persistence volumes (PV) and bind them
But, you would probably end up with something like this:
There are several persistent volume claims (PVC), but are unable to find the corresponding persistent volume (PV)
What is that? for Kafka, you should not use NFS (network file system) to use as persistent volume. Why? Because it's sooo slow. What's the answer then?
First, you should create storage class and name it e.g. local-storage. Storage class can determine the "quality" of the service, or how fast such storage is. For example, you can have big, but slow storage, which you'd only use for backups, or fast and smaller SSD. This is what's StorageClass about. Let's name ours just "local-storage".
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: local-storage
provisioner: kubernetes.io/no-provisioner
volumeBindingMode: WaitForFirstConsumer
In case of creating local storage, you need to set
provisioner: kubernetes.io/no-provisioner
volumeBindingMode: WaitForFirstConsumer
Then, check out these lines:
local:
path: /mnt/data/datadir-0-cp-helm-charts-cp-kafka-0
nodeAffinity:
required:
nodeSelectorTerms:
- matchExpressions:
- key: kubernetes.io/hostname
operator: In
values:
- pc
Oh yeah, Kubernetes allows you to use directory on node and use it as storage. Why is it so cool? because such local storage is usually very FAST. Ideal for Kafka with so many data operations.
pc in this case is the node where this mount directory exists.
Why is it needed? because pods can run on any nodes of your cluster. This is why we need to specify on which exact node is this directory present.
Reference to official documentation https://kubernetes.io/docs/concepts/storage/volumes/#local
Not so fast.. Another problem is, that confluent kafka platform by default spawns...
]]>Wazuh is a free, open-source host-based intrusion detection system (HIDS). It performs log analysis, integrity checking, Windows registry monitoring, rootkit detection, time-based alerting, and active response. It provides intrusion detection for most operating systems, including Linux, OpenBSD, FreeBSD, macOS, Solaris and Windows. Wazuh has a centralized, cross-platform architecture allowing multiple systems to be monitored and managed. (source: [wikipedia.org])
This article will show you how to easily add wazuh agent on host you want to monitor (debian linux distribution)
edit /etc/csf/csf.allow:
#wazuh monitoring
tcp|in|d=1514|d={IP of wazuh manager}
tcp|in|d=1515|d={IP of wazuh manager}
tcp|in|d=1516|d={IP of wazuh manager}
tcp|in|d=514|d={IP of wazuh manager}
csf -r
edit /etc/csf/csf.allow:
#wazuh monitoring
tcp|out|d=1514|s={IP of your host you want to monitor}
tcp|out|d=1515|s={IP of your host you want to monitor}
tcp|out|d=1516|s={IP of your host you want to monitor}
tcp|out|d=514|s={IP of your host you want to monitor}
sudo apt-get install curl apt-transport-https lsb-release -y
sudo curl -s https://packages.wazuh.com/key/GPG-KEY-WAZUH | apt-key add -
echo "deb https://packages.wazuh.com/3.x/apt/ stable main" | tee /etc/apt/sources.list.d/wazuh.list
sudo apt-get update
sudo apt-get install wazuh-agent
vi /var/ossec/etc/ossec.conf
edit:
<ossec_config>
<client>
<server>
<address>MANAGER_IP</address>
<port>1514</port>
...
replace MANAGER_IP with your own wazuh server
/var/ossec/bin/agent-auth -m {IP of wazuh manager}
Service needs to be restarted
service wazuh-agent restart
]]>
Problem encountered while running wazuh monitoring solution and kubernetes image on older hardware
in elastic_stack/elasticsearch/single-node/elasticsearch-sts.yaml
in the "env" section:
env:
- name: ES_JAVA_OPTS
value: '-Xms1024m -Xmx1024m '
- name: bootstrap.memory_lock
value: 'false'
- name: cluster.name
value: wazuh
- name: network.host
value: 0.0.0.0
- name: node.name
value: node-1
add
- name: xpack.ml.enabled
value: 'false'
edit docker-compose.yml
# Wazuh App Copyright (C) 2020 Wazuh Inc. (License GPLv2)
version: '2'
services:
wazuh:
image: wazuh/wazuh:3.12.2_7.6.2
hostname: wazuh-manager
restart: always
ports:
- "1514:1514/udp"
- "1515:1515"
- "514:514/udp"
- "55000:55000"
elasticsearch:
image: wazuh/wazuh-elasticsearch:3.12.2_7.6.2
hostname: elasticsearch
restart: always
ports:
- "9200:9200"
environment:
- "ES_JAVA_OPTS=-Xms1g -Xmx1g"
- ELASTIC_CLUSTER=true
- CLUSTER_NODE_MASTER=true
- CLUSTER_MASTER_NODE_NAME=es01
add:
environment:
...
- xpack.ml.enabled=false
]]>
Example:
Some pod needs to function with JHipster Registry. This kind of snippet has to be added to the actual pod deployment yaml descriptor:
containers:
...
...
initContainers:
- name: init-myservice
image: busybox:1.28
command: ['sh', '-c', "until nslookup jhipster-registry.default.svc.cluster.local; do echo waiting; sleep 2; done"]
Easy as that. This allows to "heal" the pods or container state and making sure pods start in correct order, without explicitly running deployment of each pod individually.
]]>matto@pc:~$ kubectl get pods --namespace=kube-system
NAME READY STATUS RESTARTS AGE
coredns-66bff467f8-j9lcr 1/1 Running 60 8m17s
coredns-66bff467f8-lf6vj 1/1 CrashLoopBackoff 99 8m17s
is described at https://github.com/coredns/coredns/tree/master/plugin/loop#troubleshooting
This file is overwritten by your network config
what helped me, to keep it's content:
chattr +i /etc/resolv.conf
this makes it immutable
sudo systemctl stop kubelet
sudo systemctl stop docker
sudo iptables --flush
sudo iptables -tnat --flush
sudo iptables -P FORWARD ACCEPT
sudo systemctl start docker
sudo systemctl start kubelet
sudo kubectl delete pods -n kube-system -l k8s-app=kube-dns
then, make sure that coreDNS starts and there are no restarts
sudo kubectl get pods --namespace=kube-system
NAME READY STATUS RESTARTS AGE
coredns-66bff467f8-bx5zc 1/1 Running 0 62s
coredns-66bff467f8-xv5sd 1/1 Running 0 62s
There should be no restarts !
]]>Just contributed to the great Cassandra CQL exporter tool on github:
List of my changes:
I hope that the author will merge that pull request :)
]]>Cassandra allows you to do backups using nodetool snapshot command.
But, this snapshot is not stored in .CQL format (which lists actual CQL commands just as you'd type them via CQL command line)
I came around this great exporter: https://github.com/thangbn/cassandra-CQL-exporter
You can use it as specified in README file, but it doesn't mention anything if you're using SSL connection to connect to cassandra.
After studying the source code, it handes special parameter "-secure", which is not specified in README file.
CQL exporter has it's cql-generator.sh:
which contains:
java -jar cql-generator.1.0.jar "$@"
But, to let java recognize the SSL settings, you need to pass the javax.net.ssl.trustStore and javax.net.ssl.trustStorePassword parameters.
with OpenJDK openjdk version "1.8.0_242" OpenJDK Runtime Environment (AdoptOpenJDK)(build 1.8.0_242-b08) OpenJDK 64-Bit Server VM (AdoptOpenJDK)(build 25.242-b08, mixed mode)
specifying these parameters just via JAVA_OPTS does not work! (see: https://stackoverflow.com/questions/2011311/running-java-with-java-opts-env-variable-has-no-effect)
so, you need to modify it manually in cql-generator.sh
export JAVA_OPTS="-Djavax.net.ssl.trustStore=/etc/cassandra/cassandra.truststore -Djavax.net.ssl.trustStorePassword=..."
java $JAVA_OPTS -jar cql-generator.1.0.jar "$@"
great. so now, let's run the cql-generator.sh with the hidden "secure" parameter to make sure the SSL works:
./cql-generator.sh -secure -h 12.34.46.78 -k my_keyspace
This approach can be used to do backups which can be easily understood and restored if some part of it is needed.
You can also use it to migrate your data to ScyllaDB, which should understand the universal CQL files.
Result:
Trying connect to host "12.34.46.78"
Success!
Trying connect to port "9042"
Success!
Trying connect to keyspace "my_keyspace"
Success!
All good!
Start exporting...
Write DDL to ~/export/my_keyspace.CQL
-----------------------------------------------
Extract from my_keyspace.mytable
Total number of record: 34244
Start write "mytable" data DML to ~/export/my_keyspace.CQL
Done 0.00%
Done 20.00%
Done 40.00%
Done 60.00%
Done 79.99%
Done 99.99%
Done exporting "mytable", total number of records exported: 36296
-----------------------------------------------
Export completed after 23.97 s!
Exited.
To run backup of all keyspaces :
CASSANDRA_IP=12.34.46.78
for i in `cqlsh --ssl $CASSANDRA_IP --connect-timeout=10000 -e "describe keyspaces;" | tr '\n' ' '`
do
./cql-generator.sh -secure -h $CASSANDRA_IP -k $i
done
That's it! Enjoy !
Matto
]]>Intro: ScyllaDB is NoSQL database, which is rewrite of Apache Cassandra NoSQL Database and claims to have better performance and all other fancy stuff.
When attempting to install ScyllaDB as Cassandra replacement, I've found out that their page:
https://www.scylladb.com/download/open-source/scylla-debian9/
requires registration. This is a total SHOWSTOPPER.
Tried to google term: "scylladb repo"
It lists the install script with URLs:
https://github.com/scylladb/scylla/blob/master/scripts/scylla_current_repo
From there, you can easily figure out, that it points to the URL:
http://downloads.scylladb.com/deb/debian/
So, all you need to do, is to edit debian repository list:
sudo vi /etc/apt/sources.list
and add:
deb [arch=amd64] http://s3.amazonaws.com/downloads.scylladb.com/downloads/scylla/deb/debian/scylladb-3.3 stretch non-free
then:
apt update
apt install scylladb
I've tried this on debian 10 buster, but after installation, there was some strange error.
The safer way is to install it as docker image
docker pull scylladb/scylla
Image source: https://miro.medium.com/max/800/1*hNNOkeWSi4sMwkjMk18L6w.png
]]>