Jak se v Jave resi Consumer Producer, kdyz chci mit filtry? – Java – Fórum – Programujte.com
 x   TIP: Přetáhni ikonu na hlavní panel pro připnutí webu

Jak se v Jave resi Consumer Producer, kdyz chci mit filtry? – Java – Fórum – Programujte.comJak se v Jave resi Consumer Producer, kdyz chci mit filtry? – Java – Fórum – Programujte.com

 

28. 8. 2019   #1
-
0
-

V informatice se v operacnich systemech, resp. jejich navrhu a interni funkcionalite, uci tzn. Producer-Consumer problem, a take Reader-Writer, a Bounded Buffer (ohraniceny buffer, napr. pole s velikosti 1000). 

Softwarova architektura zase uci koncept Pipes and Filters, tzn. kdyz mate God class se 100 funkcemi rozsekate je na 100 trid. Kazda trida je pak bud Producer, nebo Consumer, nebo oboje. Komunikace mezi nimi je pres Queue (pub/sub). 

Jak to naprogramovat, aby kdyz Consumer nestiha (= narusta mu queue.size) automaticky spustil vice threadu? Je to pak teprve pro multi core CPU.

Nahlásit jako SPAM
IP: 78.102.218.–
Jerry
~ Anonymní uživatel
504 příspěvků
29. 8. 2019   #2
-
0
-

#1 verejneuzitecnysw
tahle úloha se učila už v 90 letech na některých vš. ta třída musí mít modul komunikace aby zastavila vysílání a nezahltila "kanal"  jinak samozařejmě bys měl vědět, že "naflákat" 100 virtuálních vláken nikame nevede protože protože pokud to uděláš na procesoru třeba s 6ti jádry tak se stejně nic nestane a všechno se zahltí, vlákna musí mít nějakej plánovač jak se s pouští - ten sice má mikrokód procesoru a spolupracuje s memory managementem jádra operačního systému a rozděluje zátěž rovnoměrně na jednotlivá jádra, ale má to háček, když vlákno má přidělenou priority RealTime nebo podobnou tak ti vytuhne celej systém (OS) než se uspokojí požadavky všech vláken a pokud je to nekonečná smyčka tak ti prostě vytuhne počítač nadobro. Takže si musíš napsat krátkej prográmek pro komunikaci tříd, které obsluhují vlákna resp. který generujou a čtou zprávy. Už to chápeš ? Můžeš použít i sdílenou paměť. Jesti to děláš pod .NET je tu MemoryStream jinak powindows v native C++ je možnosti zasílání zpráv - PostMessage a další funkce...

https://docs.microsoft.com/en-us/windows/win32/api/winuser/nf-winuser-sendmessage

https://docs.microsoft.com/en-us/previous-versions//cc768129(v=technet.10)

https://docs.microsoft.com/en-us/windows/win32/sysinfo/kernel-objects

https://docs.microsoft.com/en-us/windows/win32/api/sysinfoapi/nf-sysinfoapi-getlogicalprocessorinformation

https://docs.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-isprocessorfeaturepresent

a tady je jak se zjistí zátěž procesoru

https://stackoverflow.com/questions/23143693/retrieving-cpu-load-percent-total-in-windows-with-c

ono to neni až tak jednoduchý takže bych ti doporučil dopsat si jenom tu třídu co bude sledovat co se dejě ....

Nahlásit jako SPAM
IP: 2a00:1028:83be:235a:ddaa:4682:16dd:8642...–
Jerry
~ Anonymní uživatel
504 příspěvků
29. 8. 2019   #3
-
0
-

#1 verejneuzitecnysw
jo ještě drobnost jesstli chceš pochopit jak to funguje je tu knížka třeba Principy operačního systému Unix, Maurice J. Bach, ISBN 80-901507-0-5, je to sice povídání o Unixu, ale pro Windowsd to funguje naprosto identicky. řízení procesů máš na stránce 218.

Nahlásit jako SPAM
IP: 2a00:1028:83be:235a:ddaa:4682:16dd:8642...–
29. 8. 2019   #4
-
0
-

#2 Jerry
Je to na virtualnim stroji (JVM). Takze aby se automaticky paralelizoval pomaly consumer, producer musi

1. detekovat, ze pomalemu consumerovi narusta queue.size

2. omezit komunikaci, aby nezahltil consumera jeste vice

3. notifikovat Scheduler, at paralelizuje consumera

Takhle?

Scheduler pak musi spustit vice threadu s consumerem. Jak to ale provede, kdyz je tam jedna queue a maji z ni cist treba 4 paralelizovani consumeri? Napada me jen synchronizovat zapis/cteni v te queue, a to pak ostatni cekaji zatimco jeden thread exklusivne pristupuje ke queue pro cteni ci zapis.

Vim, ze existuje work stealing scheduling, tzn. kdyz nejaky thread vyhladovi ukradne praci z jineho threadu. Jak to cele spojit do jednoho konceptu? Je nekde demo nebo powerpoint slides, nebo neco, abych ten Producer-Consumer Scheduler nevynalezal znovu, a na kolene?

Nahlásit jako SPAM
IP: 69.16.145.–
Kit+15
Guru
29. 8. 2019   #5
-
0
-

#4 verejneuzitecnysw
Udělal bych jedinou frontu na producentovi, kterou nebude problém ohlídat.

Mrkni na návrhový vzor Pool.

Nahlásit jako SPAM
IP: 46.135.95.–
Komentáře označují místa, kde programátor udělal chybu nebo něco nedodělal.
Jerry
~ Anonymní uživatel
504 příspěvků
29. 8. 2019   #6
-
0
-

#4 verejneuzitecnysw
jo mám někde tu verzi z vš .. když počkáš do večera tak se kouknu .. a nakopčim ti ji sem ... ale je to je školí uloha..ta knížka s Bachem - unix -  ta popisuje reálné jádro Unixu/Linuxu/windows oni to maj všichni přibližně stejný ...

nezapomeň že když budeš mít jednu fronu a z ní budou všichni číst tak při každým čtení bude muset to vlákno co z ní čte procházet celou frontu a hledat v ní zprávy co jsou určený jen pro sebe .. chápeš jo ? proto se vypaltí mít frontu zpráv každej object s každým vláknem ...a ukazatel zaplnění ...

další možností je, že frontu uděláš jako inteligentní třídu a po přijetí požadafku ti sama vystaví zprávu pokud něuaká existuje .. možností je mnoho ...

Nahlásit jako SPAM
IP: 2a00:1028:83be:235a:c0ca:b5f:928c:d750...–
gna
~ Anonymní uživatel
1851 příspěvků
29. 8. 2019   #7
-
0
-

#4 verejneuzitecnysw
abych ten Producer-Consumer Scheduler nevynalezal znovu, a na kolene

Ten jeden for nebo if, ve kterém vytvoříš vlákno? :))

Jinak takové věci samozřejmě Java umí.

Nahlásit jako SPAM
IP: 213.211.51.–
Jerry
~ Anonymní uživatel
504 příspěvků
29. 8. 2019   #8
-
0
-

příklad je tady

https://uloz.to/file/WOA8bpb0uCWm/os-producent-konzument-rar

je tam víc semestrálek je tam všechno co sme v daným semestru dělali na základy operačních systémů no je toho docela dost...

v Javě to teda nemám protože tenkdát Java eště neexistovala, je to samozřejmě v pascalu, assembleru a céčku, .. pascal je tam přiloženej  :)

winrar je tady

https://uloz.to/file/9X9yxaUug/winrar-zip


tady máš VMWare ve kterým se to dá všechno spouštět

https://uloz.to/file/VNHCq13gWu1f/msdos-rar

stačí ksyž si stáhneš VMWare player v15 ten je zadara... do vmware se vše instaluje přes USB FDD floppy mechaniku !!!! protože je tam MSDOS 6.22, připojit USB disk uměl až MSDOS 7...

já bych to napsal tak, že bych na každý vlákno dynamicky vytvořil instanci "vhodné" třídy která bude obsahovat vlastní frontu a metody GET a PUT neboli  READ a INSERT  a CHECKQUEUE která ti vrací Boolean jestli je možné zapisovat nebo ne ... a producent pak nejprve spustí metodu CHECKQUEUE a pokdu vrátí TRUE tak se muže zapisovat další zpráva ... asi tak nějak .. .

no a co se týká Javy tak příkladů je plnej internet .. stačí do googlu zadat heslo "Java Producent Consument" a vyskočí ti spousta hotovejch řešení ... třeba tady je jedno

https://www.geeksforgeeks.org/producer-consumer-solution-using-threads-java/

a tady další ... a další ...

https://www.tutorialspoint.com/javaexamples/thread_procon.htm

takže v principu nechápu proč se vlastně ptáš ..

Nahlásit jako SPAM
IP: 2a00:1028:83be:235a:c0ca:b5f:928c:d750...–
30. 8. 2019   #9
-
0
-

#8 Jerry
Diky moc za archiv! Podivam se, zda pujde neco pouzit.

Producer/Consumer je jen pattern, ale zasekl jsem kdyz je potreba mit 100 filtru, kde jeden je Producer a 99 dela oboje (Consumer i Producer). Museji se totiz spustit v urcitem poradi, protoze vystup jednoho je vstup druheho. A nejpomalejsi seriovy filtr se musi automaticky detekovat pres narustajici queue.size, a pak pres Scheduler paralelizovat.

V prikladech, ktere jsou v Jave je Producer/Consumer bez Scheduleru a bez Pipeline. Pipeline je ten Filter Graph, to poradi ve kterem se maji spoustet. Scheduler je zase ten ForkJoinPool s work stealing, pres ktery ma bezet jen 8 threadu v jednu dobu.

tzn. jsem se zasekl na dynamickem load balancingu v pripade, ze Producer/Consumer je tam treba 100x zasebou, a ja nechci mit 100 threadu najednou, a chci mit tu pipeline spustenou v poradi, v jakem je v Grafu (pro jednoduchost muze byt Pipepine Set. Napr. Set<Step> pipeline ...; A pak StepA, StepB, StepC, StepD - 4 filtry). StepA je Producer, StepB je oboje (Consumer/Producer), StepC je oboje, StepD je Consumer. Ted za behu chci zjistit, zda narusta nekde queue a paralelizovat toho nejpomalejsiho filtra. (treba StepB). 

Ten koncept znovu vynalezam na kolene. No, kez by to byla jedna podminka. Jde o ten cely koncept, at se to load balancuje za behu, vzdy v tom bode kde je to nejpomalejsi, a at to nedela blbosti (neskace nesmyslne sem a tam v poctu najednou spustenych threadu). Jak mam zatez (load) jednotlivych queue merit, aby to paralelizovalo chytre ten nejpomalejsi seriovy filtr, a bez blbosti? 

Nahlásit jako SPAM
IP: 78.102.218.–
Jerry
~ Anonymní uživatel
504 příspěvků
30. 8. 2019   #10
-
0
-

#9 verejneuzitecnysw
já se přiznám že uplně tak nechápu co chceš nebo co vlastně máš za domácí úkol ..

zadání si sem nedal

to je do školy ? do práce ???

100 threadů stejně najednou nespustíš mikrokod procesoru a hlavně operační systém to bude přepínat podle potřeby ...

jestli máš 100 procesů spojených seriově tak podle mě ten nejpomalejší můžeš rozdělit na víc podůloh

Nahlásit jako SPAM
IP: 2a00:1028:83be:235a:d85c:712e:6594:c0e0...–
31. 8. 2019   #11
-
0
-

#10 Jerry 

ahoj, je to hobby projekt. Dobre jsi mi poradil ten scheduler. Ted jeste jak ho naprogramovat. To je cele.

Nechapu jak se programuje scheduler. Jen vim co ma teoreticky delat. 8 threadu scheduler ma nechat v running state a zbytek mit v queue (ready state). V Jave je pool pattern implementovan pres ForkJoinPool (thread pool s work stealing), ale nikdy jsem to nepouzil.

muj zdrojak vypada takhle (nize).

Scheduler musi v runtime vytvorit vice threadu consumera (scale up) nebo thready omezit (scale down) v zavislosti na queue.size():  

// A filter can produce, consume, or both combined. 

import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;

public class BackpressurePullSolution {
    public static void main(String[] args) throws Exception {
        Producer producer = new Producer();
        Consumer consumer = new Consumer();
        producer.start();
        consumer.start();
        producer.join();

        // we only get the latest 10 items when we are ready (they can be updated in any velocity. We skip updates while we are busy)
        BoundedQueue queue = BoundedQueue.getInstance();
        do {
            Thread.yield();
        } while (queue.getSize() != 0);
        consumer.interrupt();       
    }

    // Producer is a filter
    private static class Producer extends Thread {
        @Override
        public void run() {
            BoundedQueue queue = BoundedQueue.getInstance();
            for (var i = 0; i < 1_000_000; i++) {
                queue.offer("Hello - " + queue.getSize());
            }
        }
    }

    // Consumer is a filter
    private static class Consumer extends Thread {
        @Override
        public void run() {
            BoundedQueue queue = BoundedQueue.getInstance();
            do {
                queue.request(10);
                for (var i = 0; i < 10; i++) {
                    String message = queue.poll();
                    if (message != null) {
                        System.out.println("message: " + message);
                    }
                }
            } while (!Thread.interrupted());
        }
    }

    private static class BoundedQueue {
        private Queue<String> queue;
        private static final int CAPACITY = 100_000;
        private static BoundedQueue instance;
        private final Object lock = new Object();
        private boolean requesting = false;
        private int request = 0;

        private BoundedQueue(int capacity) {
            queue = new ArrayBlockingQueue<>(capacity);
        }

        public static synchronized BoundedQueue getInstance() {
            if (instance == null)
                instance = new BoundedQueue(CAPACITY);

            return instance;
        }

        public boolean offer(String message) {
            synchronized (lock) {
                if (queue.size() == CAPACITY) {
                    // full capacity
                    System.out.println("Applying backpressure strategy - drop all");
                    queue.clear(); // drop all to make space for fresh data
                }

                if (requesting) {
                    request--;
                    if (request == 0) {
                        requesting = false;
                    }

                    return queue.offer(message);
                } else
                    return false;
            }
        }

        public int getSize() {
            return queue.size();
        }

        public synchronized void request(int n) {
            requesting = true;
            request = n;
        }

        public String poll() {
            synchronized (lock) {
                if (queue.isEmpty())
                    return null;

                return queue.poll();
            }
        }
    }
}
Nahlásit jako SPAM
IP: 209.107.192.–
Jerry
~ Anonymní uživatel
504 příspěvků
31. 8. 2019   #12
-
0
-

#11 verejneuzitecnysw
máš to hezký, já Javu neumim... dělal sem něco podobnýho pro native C a C++ a pak C++/CLI pro jednojádrový procesory a i vícejádrový. Nevim jak je na tom Java, ale C++/CLI používá pro .NET ThreadPriority flag

https://docs.microsoft.com/cs-cz/dotnet/api/system.threading.threadpriority?view=netframework-4.8

https://www.geeksforgeeks.org/c-sharp-thread-priority-in-multithreading/

tim si určíš jak rychle vlákno poběží, ideálně platí, že ThreadPriority.Highest dáš jen tolika vláknům kolik je na procesoru jader (nebo možných vláken), tim pádem zatížíš procesor na 99 procent a už ti nebude reagovat na vstupy např. z klávesnice a tak...

jinak se to dělá tak, že ta část kodu, která tvoří vstupní bod každého vlákna pro daný thread tvoří lokálního administrátora vlákna a ten zavolá výpočetní jádro třídy. stejně tak můžeš ne-mít lokálního administrátora a nechat vše na jednom mater administrátorovi samostatné třídě, která vše administruje, ale nezapomeň že ten vyžaduje minimálně 5 procent výkonu procesoru. Nevim jak je to v Javě ale pro .NET při vytvoření třídy "Filter" příkazem gcnew/new získáš handle dané třídy. tato třída musí mít předdefinované metody třeba admin, kterého když zavoláš z hlavního administrátora tak se spustí např. jedna smyčka výpočtu a pak se vlákno uvolní. V C# na injektáž kodu filtru je tzv. dynamický kompilátor z projektu Roslyn. Jak je na tom Java opravdu nevim ...

Nahlásit jako SPAM
IP: 109.81.214.–
Jerry
~ Anonymní uživatel
504 příspěvků
31. 8. 2019   #13
-
0
-

#11 verejneuzitecnysw
máš mnoho možností, buď naprogramuješ spustíš více java/jar (říkejme mu EXE) souborů necháš přepínání na operačním systému, který má ale tendenci průměrovat výkon takže všechny thready budou dostávat stejný výpočetní čas - za předpokladu, že Priority je nastaveno na NORMAL. pak můžeš v každém takovém EXE souboru naprogramovat, že pokud je jedel libovolný spuštěn jako první, stává se MASTER a dostává prioritu RealTime a ostatní Iddle, tohle funguje na jednojádrovým prcesoru. Na vícejádrovým spouštíš více masterů a detekce který se má spustit se dělá  např. přes systémově sdílenou paměť nebo zasíláním zpráv nebo jednoduše synchronizačním souborem na disku, kam se zapíše kdo je spuštěnej a v jakým režimu běží, každý EXE soubor si pak přečte kolik je spuštěných RealTime vláken a jestli se má přepnout z modu Iddle to RealTime. Znamná to, že všechny spuštěné exe soubory jenom v určitých intervalech čtou ten datový soubor nebo sdílenou paměť atd ... Je to nejjednodušší a nejrychleji se to naprogramuje....

pak je ta varianta producent-konsumer ale jak sem psal tak si vytvoříš jednu universální výpočetní třídu, která bude v sobě obsahovat jak producenta tak toho požírače :) tak i výpočetní část, kterou můžeš uzpůsobit tak, že vytvoříš tzv. "kernel" což je kod pro jeden výpočetní cyklus tvého výpočtu a ten se spustí pokaždé když dostane možnost běžet.

Nahlásit jako SPAM
IP: 109.81.214.–
Zjistit počet nových příspěvků

Přidej příspěvek

Toto téma je starší jak čtvrt roku – přidej svůj příspěvek jen tehdy, máš-li k tématu opravdu co říct!

Ano, opravdu chci reagovat → zobrazí formulář pro přidání příspěvku

×Vložení zdrojáku

×Vložení obrázku

Vložit URL obrázku Vybrat obrázek na disku
Vlož URL adresu obrázku:
Klikni a vyber obrázek z počítače:

×Vložení videa

Aktuálně jsou podporována videa ze serverů YouTube, Vimeo a Dailymotion.
×
 
Podporujeme Gravatara.
Zadej URL adresu Avatara (40 x 40 px) nebo emailovou adresu pro použití Gravatara.
Email nikam neukládáme, po získání Gravatara je zahozen.
-
Pravidla pro psaní příspěvků, používej diakritiku. ENTER pro nový odstavec, SHIFT + ENTER pro nový řádek.
Sledovat nové příspěvky (pouze pro přihlášené)
Sleduj vlákno a v případě přidání nového příspěvku o tom budeš vědět mezi prvními.
Reaguješ na příspěvek:

Uživatelé prohlížející si toto vlákno

Uživatelé on-line: 0 registrovaných, 7 hostů

Moderátoři diskuze

 

Hostujeme u Českého hostingu       ISSN 1801-1586       ⇡ Nahoru Webtea.cz logo © 20032024 Programujte.com
Zasadilo a pěstuje Webtea.cz, šéfredaktor Lukáš Churý